From 5929ab38b74f7f8328412e220f52eaf3d34743a6 Mon Sep 17 00:00:00 2001 From: jimingquan Date: Wed, 26 Jul 2023 15:24:44 +0800 Subject: [PATCH] add path limit --- src/graph/context/ast/QueryAstContext.h | 1 + .../executor/algo/BFSShortestPathExecutor.cpp | 43 ++++++-- .../executor/algo/BFSShortestPathExecutor.h | 2 + .../algo/MultiShortestPathExecutor.cpp | 8 +- .../rule/PushLimitDownShortestPathRule.cpp | 102 ++++++++++++++++++ .../rule/PushLimitDownShortestPathRule.h | 72 +++++++++++++ src/graph/planner/ngql/PathPlanner.cpp | 2 + src/graph/planner/plan/Algo.cpp | 1 - src/graph/validator/FindPathValidator.cpp | 4 + src/parser/Clauses.h | 15 +++ src/parser/TraverseSentences.cpp | 4 + src/parser/TraverseSentences.h | 9 ++ src/parser/parser.yy | 12 ++- 13 files changed, 259 insertions(+), 16 deletions(-) create mode 100644 src/graph/optimizer/rule/PushLimitDownShortestPathRule.cpp create mode 100644 src/graph/optimizer/rule/PushLimitDownShortestPathRule.h diff --git a/src/graph/context/ast/QueryAstContext.h b/src/graph/context/ast/QueryAstContext.h index 3219ecf1971..7af50d29891 100644 --- a/src/graph/context/ast/QueryAstContext.h +++ b/src/graph/context/ast/QueryAstContext.h @@ -42,6 +42,7 @@ struct PathContext final : AstContext { Starts to; StepClause steps; Over over; + int64_t limit{-1}; Expression* filter{nullptr}; std::vector colNames; diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.cpp b/src/graph/executor/algo/BFSShortestPathExecutor.cpp index b098145fbe4..10d66fad128 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.cpp +++ b/src/graph/executor/algo/BFSShortestPathExecutor.cpp @@ -43,11 +43,20 @@ folly::Future BFSShortestPathExecutor::execute() { futures.emplace_back(std::move(leftFuture)); futures.emplace_back(std::move(rightFuture)); - return folly::collect(futures) + return folly::collectAll(futures) .via(runner()) - .thenValue([this](auto&& status) { + .thenValue([this](std::vector>&& resps) { + for (auto& respVal : resps) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + } memory::MemoryCheckGuard guard; - UNUSED(status); return conjunctPath(); }) .thenValue([this](auto&& status) { @@ -166,13 +175,23 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { } } - return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { - memory::MemoryCheckGuard guard; - for (auto& resp : resps) { - currentDs_.append(std::move(resp)); - } - return Status::OK(); - }); + return folly::collectAll(futures).via(runner()).thenValue( + [this](std::vector>&& resps) { + memory::MemoryCheckGuard guard; + for (auto& respVal : resps) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto resp = std::move(respVal).value(); + currentDs_.append(std::move(resp)); + } + return Status::OK(); + }); } DataSet BFSShortestPathExecutor::doConjunct(const std::vector& meetVids, @@ -189,6 +208,10 @@ DataSet BFSShortestPathExecutor::doConjunct(const std::vector& meetVids, Row row; row.emplace_back(std::move(result)); ds.rows.emplace_back(std::move(row)); + cnt_.fetch_add(1, std::memory_order_relaxed); + if (cnt_.load(std::memory_order_relaxed) > limit_) { + return ds; + } } } return ds; diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.h b/src/graph/executor/algo/BFSShortestPathExecutor.h index c408743d1f6..f567137a5f3 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.h +++ b/src/graph/executor/algo/BFSShortestPathExecutor.h @@ -70,6 +70,8 @@ class BFSShortestPathExecutor final : public Executor { std::vector> allRightEdges_; DataSet currentDs_; std::string terminateEarlyVar_; + size_t limit_{std::numeric_limits::max()}; + std::atomic cnt_{0}; }; } // namespace graph } // namespace nebula diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.cpp b/src/graph/executor/algo/MultiShortestPathExecutor.cpp index 4d4c372adcd..2092bf8e566 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.cpp +++ b/src/graph/executor/algo/MultiShortestPathExecutor.cpp @@ -239,10 +239,6 @@ DataSet MultiShortestPathExecutor::doConjunct( [this](const std::vector& leftPaths, const std::vector& rightPaths, DataSet& ds) { for (const auto& leftPath : leftPaths) { for (const auto& rightPath : rightPaths) { - cnt_.fetch_add(1, std::memory_order_relaxed); - if (cnt_.load(std::memory_order_relaxed) > limit_) { - break; - } auto forwardPath = leftPath; auto backwardPath = rightPath; backwardPath.reverse(); @@ -250,6 +246,10 @@ DataSet MultiShortestPathExecutor::doConjunct( Row row; row.values.emplace_back(std::move(forwardPath)); ds.rows.emplace_back(std::move(row)); + cnt_.fetch_add(1, std::memory_order_relaxed); + if (cnt_.load(std::memory_order_relaxed) > limit_) { + break; + } } } }; diff --git a/src/graph/optimizer/rule/PushLimitDownShortestPathRule.cpp b/src/graph/optimizer/rule/PushLimitDownShortestPathRule.cpp new file mode 100644 index 00000000000..0952e3b30f6 --- /dev/null +++ b/src/graph/optimizer/rule/PushLimitDownShortestPathRule.cpp @@ -0,0 +1,102 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/optimizer/rule/PushLimitDownShortestPathRule.h" + +#include "graph/optimizer/OptContext.h" +#include "graph/optimizer/OptGroup.h" +#include "graph/planner/plan/Algo.h" +#include "graph/planner/plan/PlanNode.h" +#include "graph/planner/plan/Query.h" + +using nebula::graph::BFSShortestPath; +using nebula::graph::Limit; +using nebula::graph::MultiShortestPath; +using nebula::graph::PlanNode; +using nebula::graph::QueryContext; + +namespace nebula { +namespace opt { +// transform Limit->DataCollect->Loop->BFS/MultiShortest +// to DataCollect->Loop->BFS/MultiShortest(limit) +std::unique_ptr PushLimitDownShortestPathRule::kInstance = + std::unique_ptr(new PushLimitDownShortestPathRule()); + +PushLimitDownShortestPathRule::PushLimitDownShortestPathRule() { + RuleSet::QueryRules().addRule(this); +} + +const Pattern &PushLimitDownShortestPathRule::pattern() const { + static Pattern pattern = Pattern::create( + graph::PlanNode::Kind::kLimit, + {Pattern::create( + graph::PlanNode::Kind::kDataCollect, + {Pattern::create(graph::PlanNode::Kind::kLoop, + {Pattern::create({graph::PlanNode::Kind::kBFSShortest, + graph::PlanNode::Kind::kMultiShortestPath})})})}); + return pattern; +} + +StatusOr PushLimitDownShortestPathRule::transform( + OptContext *ctx, const MatchedResult &matched) const { + auto qctx = ctx->qctx(); + auto limitGroupNode = matched.node; + auto dataCollectGroupNode = matched.dependencies.front().node; + auto loopGroupNode = matched.dependencies.front().dependencies.front().node; + auto pathGroupNode = matched.dependencies.front().dependencies.front().dependencies.front().node; + + const auto limit = static_cast(limitGroupNode->node()); + const auto dataCollect = static_cast(dataCollectGroupNode->node()); + const auto loop = static_cast(loopGroupNode->node()); + auto pathNode = pathGroupNode->node(); + auto pathKind = pathNode->kind(); + + int64_t pathLimit = -1; + if (pathKind == PlanNode::Kind::kBFSShortest) { + pathLimit = static_cast(pathNode)->limit(); + } else { + pathLimit = static_cast(pathNode)->limit(); + } + + int64_t limitRows = limit->offset() + limit->count(qctx); + if (pathLimit >= 0 && limitRows >= pathLimit) { + return TransformResult::noTransform(); + } + + auto newDataCollect = static_cast(dataCollect->clone()); + newDataCollect->setOutputVar(limit->outputVar()); + auto newDataCollectGroupNode = OptGroupNode::create(ctx, newDataCollect, limitGroupNode->group()); + + auto newLoop = static_cast(loop->clone()); + auto newLoopGroup = OptGroup::create(ctx); + auto newLoopGroupNode = newLoopGroup->makeGroupNode(newLoop); + + auto newPath = static_cast(pathNode->clone()); + if (pathKind == PlanNode::Kind::kBFSShortest) { + static_cast(newPath)->setLimit(limitRows); + } else { + static_cast(newPath)->setLimit(limitRows); + } + auto newPathGroup = OptGroup::create(ctx); + auto newPathGroupNode = newPathGroup->makeGroupNode(newPath); + + newDataCollectGroupNode->dependsOn(newLoopGroup); + + for (auto dep : pathGroupNode->dependencies()) { + newPathGroupNode->dependsOn(dep); + } + + TransformResult result; + result.eraseAll = true; + result.newGroupNodes.emplace_back(newPathGroupNode); + return result; +} + +std::string PushLimitDownShortestPathRule::toString() const { + return "PushLimitDownShortestPathRule"; +} + +} // namespace opt +} // namespace nebula diff --git a/src/graph/optimizer/rule/PushLimitDownShortestPathRule.h b/src/graph/optimizer/rule/PushLimitDownShortestPathRule.h new file mode 100644 index 00000000000..28ffcdb4378 --- /dev/null +++ b/src/graph/optimizer/rule/PushLimitDownShortestPathRule.h @@ -0,0 +1,72 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_OPTIMIZER_RULE_PUSHLIMITDOWNSHORTESTPATHRULE_H +#define GRAPH_OPTIMIZER_RULE_PUSHLIMITDOWNSHORTESTPATHRULE_H + +#include "graph/optimizer/OptRule.h" + +namespace nebula { +namespace opt { + +// Push [[Limit]] down [[BFSShortestPath / MulitShortestPath]] +// Required conditions: +// 1. Match the pattern +// Benefits: +// 1. Limit data early to optimize performance +// +// Tranformation: +// Before: +// +// +--------+--------+ +// | Limit | +// | (limit=3) | +// +--------+--------+ +// | +// +---------+---------+ +// | DataCollect | +// +---------+---------+ +// | +// +---------+---------+ +// | Loop | +// +---------+---------+ +// | +// +---------+---------+ +// | ShortestPath | +// +---------+---------+ +// +// After: +// +// +---------+---------+ +// | DataCollect | +// +---------+---------+ +// | +// +---------+---------+ +// | Loop | +// +---------+---------+ +// | +// +--------+--------+ +// | ShortestPaths | +// | (limit=3) | +// +--------+--------+ + +class PushLimitDownShortestPathRule final : public OptRule { + public: + const Pattern &pattern() const override; + + StatusOr transform(OptContext *ctx, + const MatchedResult &matched) const override; + + std::string toString() const override; + + private: + PushLimitDownShortestPathRule(); + + static std::unique_ptr kInstance; +}; + +} // namespace opt +} // namespace nebula +#endif diff --git a/src/graph/planner/ngql/PathPlanner.cpp b/src/graph/planner/ngql/PathPlanner.cpp index 563b3316553..ee5a2c4bc40 100644 --- a/src/graph/planner/ngql/PathPlanner.cpp +++ b/src/graph/planner/ngql/PathPlanner.cpp @@ -117,6 +117,7 @@ SubPlan PathPlanner::singlePairPlan(PlanNode* left, PlanNode* right) { path->setRightVidVar(pathCtx_->toVidsVar); path->setColNames({kPathStr}); path->setTerminateEarlyVar(terminateEarlyVar); + path->setLimit(pathCtx_->limit); auto* loopCondition = singlePairLoopCondition(steps, path->outputVar(), terminateEarlyVar); auto* loop = Loop::make(qctx, nullptr, path, loopCondition); @@ -221,6 +222,7 @@ SubPlan PathPlanner::multiPairPlan(PlanNode* left, PlanNode* right) { path->setRightVidVar(pathCtx_->toVidsVar); path->setTerminationVar(terminationVar); path->setColNames({kPathStr}); + path->setLimit(pathCtx_->limit); SubPlan loopDep = loopDepPlan(); auto* loopCondition = multiPairLoopCondition(steps, terminationVar); diff --git a/src/graph/planner/plan/Algo.cpp b/src/graph/planner/plan/Algo.cpp index b08afcddb8c..ff70be3344b 100644 --- a/src/graph/planner/plan/Algo.cpp +++ b/src/graph/planner/plan/Algo.cpp @@ -11,7 +11,6 @@ #include "graph/util/ToJson.h" namespace nebula { namespace graph { - PlanNode* AllPaths::clone() const { auto* path = AllPaths::make(qctx_, nullptr, nullptr, space_, steps_, noLoop_, withProp_); path->cloneMembers(*this); diff --git a/src/graph/validator/FindPathValidator.cpp b/src/graph/validator/FindPathValidator.cpp index 15d46d31ca5..a171682cd8c 100644 --- a/src/graph/validator/FindPathValidator.cpp +++ b/src/graph/validator/FindPathValidator.cpp @@ -26,6 +26,10 @@ Status FindPathValidator::validateImpl() { NG_RETURN_IF_ERROR(validateWhere(fpSentence->where())); NG_RETURN_IF_ERROR(ValidateUtil::validateStep(fpSentence->step(), pathCtx_->steps)); NG_RETURN_IF_ERROR(validateYield(fpSentence->yield())); + auto limitClause = fpSentence->limit(); + if (limitClause != nullptr) { + pathCtx_->limit = limitClause->limit(); + } return Status::OK(); } diff --git a/src/parser/Clauses.h b/src/parser/Clauses.h index 0d275996028..0807ff76127 100644 --- a/src/parser/Clauses.h +++ b/src/parser/Clauses.h @@ -10,6 +10,21 @@ #include "interface/gen-cpp2/storage_types.h" namespace nebula { +class LimitClause final { + public: + explicit LimitClause(int64_t limit) : limit_(limit) {} + + std::string toString() const { + return folly::stringPrintf("LIMIT %ld", limit_); + } + + int64_t limit() const { + return limit_; + } + + private: + int64_t limit_{-1}; +}; class StepClause final { public: explicit StepClause(uint32_t steps = 1) { diff --git a/src/parser/TraverseSentences.cpp b/src/parser/TraverseSentences.cpp index fcd12a862ea..82c9252cd14 100644 --- a/src/parser/TraverseSentences.cpp +++ b/src/parser/TraverseSentences.cpp @@ -233,6 +233,10 @@ std::string FindPathSentence::toString() const { buf += " "; buf += yield_->toString(); } + if (limit_ != nullptr) { + buf += " "; + buf += limit_->toString(); + } return buf; } diff --git a/src/parser/TraverseSentences.h b/src/parser/TraverseSentences.h index b4ef8bb11f0..f607e304ae1 100644 --- a/src/parser/TraverseSentences.h +++ b/src/parser/TraverseSentences.h @@ -425,6 +425,10 @@ class FindPathSentence final : public Sentence { noLoop_ = noLoop; } + void setLimit(LimitClause* clause) { + limit_.reset(clause); + } + void setFrom(FromClause* clause) { from_.reset(clause); } @@ -469,6 +473,10 @@ class FindPathSentence final : public Sentence { return where_.get(); } + LimitClause* limit() const { + return limit_.get(); + } + YieldClause* yield() const { return yield_.get(); } @@ -497,6 +505,7 @@ class FindPathSentence final : public Sentence { std::unique_ptr step_; std::unique_ptr where_; std::unique_ptr yield_; + std::unique_ptr limit_; }; class LimitSentence final : public Sentence { diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 85ab5dcd8b6..f8b90d257b7 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -78,6 +78,7 @@ using namespace nebula; nebula::ColumnNameList *column_name_list; nebula::StepClause *step_clause; nebula::StepClause *find_path_upto_clause; + nebula::LimitClause *find_path_limit_clause; nebula::FromClause *from_clause; nebula::ToClause *to_clause; nebula::VertexIDList *vid_list; @@ -302,6 +303,7 @@ using namespace nebula; %type edge_key_ref %type to_clause %type find_path_upto_clause +%type find_path_limit_clause %type group_clause %type host_list %type host_item @@ -2215,7 +2217,7 @@ find_path_sentence s->setYield($10); $$ = s; } - | KW_FIND KW_SHORTEST KW_PATH opt_with_properties from_clause to_clause over_clause where_clause find_path_upto_clause yield_clause { + | KW_FIND KW_SHORTEST KW_PATH opt_with_properties from_clause to_clause over_clause where_clause find_path_upto_clause yield_clause find_path_limit_clause { auto *s = new FindPathSentence(true, $4, false); s->setFrom($5); s->setTo($6); @@ -2223,6 +2225,7 @@ find_path_sentence s->setWhere($8); s->setStep($9); s->setYield($10); + s->setLimit($11); $$ = s; } | KW_FIND KW_NOLOOP KW_PATH opt_with_properties from_clause to_clause over_clause where_clause find_path_upto_clause yield_clause { @@ -2249,6 +2252,13 @@ find_path_upto_clause } ; +find_path_limit_clause + : %empty { $$ = new LimitClause(-1); } + | KW_LIMIT legal_integer { + $$ = new LimitClause($2); + } + ; + to_clause : KW_TO vid_list { $$ = new ToClause($2);