Skip to content

Commit

Permalink
add path limit
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Jul 26, 2023
1 parent bec96a9 commit 5929ab3
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct PathContext final : AstContext {
Starts to;
StepClause steps;
Over over;
int64_t limit{-1};
Expression* filter{nullptr};
std::vector<std::string> colNames;

Expand Down
43 changes: 33 additions & 10 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,20 @@ folly::Future<Status> BFSShortestPathExecutor::execute() {
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this](auto&& status) {
.thenValue([this](std::vector<folly::Try<Status>>&& resps) {
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
}
memory::MemoryCheckGuard guard;
UNUSED(status);
return conjunctPath();
})
.thenValue([this](auto&& status) {
Expand Down Expand Up @@ -166,13 +175,23 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
}
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return Status::OK();
});
return folly::collectAll(futures).via(runner()).thenValue(
[this](std::vector<folly::Try<DataSet>>&& resps) {
memory::MemoryCheckGuard guard;
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
currentDs_.append(std::move(resp));
}
return Status::OK();
});
}

DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
Expand All @@ -189,6 +208,10 @@ DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
Row row;
row.emplace_back(std::move(result));
ds.rows.emplace_back(std::move(row));
cnt_.fetch_add(1, std::memory_order_relaxed);
if (cnt_.load(std::memory_order_relaxed) > limit_) {
return ds;
}
}
}
return ds;
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/algo/BFSShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class BFSShortestPathExecutor final : public Executor {
std::vector<std::unordered_multimap<Value, Edge>> allRightEdges_;
DataSet currentDs_;
std::string terminateEarlyVar_;
size_t limit_{std::numeric_limits<size_t>::max()};
std::atomic<size_t> cnt_{0};
};
} // namespace graph
} // namespace nebula
Expand Down
8 changes: 4 additions & 4 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,17 @@ DataSet MultiShortestPathExecutor::doConjunct(
[this](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
for (const auto& leftPath : leftPaths) {
for (const auto& rightPath : rightPaths) {
cnt_.fetch_add(1, std::memory_order_relaxed);
if (cnt_.load(std::memory_order_relaxed) > limit_) {
break;
}
auto forwardPath = leftPath;
auto backwardPath = rightPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
cnt_.fetch_add(1, std::memory_order_relaxed);
if (cnt_.load(std::memory_order_relaxed) > limit_) {
break;
}
}
}
};
Expand Down
102 changes: 102 additions & 0 deletions src/graph/optimizer/rule/PushLimitDownShortestPathRule.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/optimizer/rule/PushLimitDownShortestPathRule.h"

#include "graph/optimizer/OptContext.h"
#include "graph/optimizer/OptGroup.h"
#include "graph/planner/plan/Algo.h"
#include "graph/planner/plan/PlanNode.h"
#include "graph/planner/plan/Query.h"

using nebula::graph::BFSShortestPath;
using nebula::graph::Limit;
using nebula::graph::MultiShortestPath;
using nebula::graph::PlanNode;
using nebula::graph::QueryContext;

namespace nebula {
namespace opt {
// transform Limit->DataCollect->Loop->BFS/MultiShortest
// to DataCollect->Loop->BFS/MultiShortest(limit)
std::unique_ptr<OptRule> PushLimitDownShortestPathRule::kInstance =
std::unique_ptr<PushLimitDownShortestPathRule>(new PushLimitDownShortestPathRule());

PushLimitDownShortestPathRule::PushLimitDownShortestPathRule() {
RuleSet::QueryRules().addRule(this);
}

const Pattern &PushLimitDownShortestPathRule::pattern() const {
static Pattern pattern = Pattern::create(
graph::PlanNode::Kind::kLimit,
{Pattern::create(
graph::PlanNode::Kind::kDataCollect,
{Pattern::create(graph::PlanNode::Kind::kLoop,
{Pattern::create({graph::PlanNode::Kind::kBFSShortest,
graph::PlanNode::Kind::kMultiShortestPath})})})});
return pattern;
}

StatusOr<OptRule::TransformResult> PushLimitDownShortestPathRule::transform(
OptContext *ctx, const MatchedResult &matched) const {
auto qctx = ctx->qctx();
auto limitGroupNode = matched.node;
auto dataCollectGroupNode = matched.dependencies.front().node;
auto loopGroupNode = matched.dependencies.front().dependencies.front().node;
auto pathGroupNode = matched.dependencies.front().dependencies.front().dependencies.front().node;

const auto limit = static_cast<const Limit *>(limitGroupNode->node());
const auto dataCollect = static_cast<const DataCollect *>(dataCollectGroupNode->node());
const auto loop = static_cast<const Loop *>(loopGroupNode->node());
auto pathNode = pathGroupNode->node();
auto pathKind = pathNode->kind();

int64_t pathLimit = -1;
if (pathKind == PlanNode::Kind::kBFSShortest) {
pathLimit = static_cast<const BFSShortestPath *>(pathNode)->limit();
} else {
pathLimit = static_cast<const MultiShortestPath *>(pathNode)->limit();
}

int64_t limitRows = limit->offset() + limit->count(qctx);
if (pathLimit >= 0 && limitRows >= pathLimit) {
return TransformResult::noTransform();
}

auto newDataCollect = static_cast<dataCollect *>(dataCollect->clone());
newDataCollect->setOutputVar(limit->outputVar());
auto newDataCollectGroupNode = OptGroupNode::create(ctx, newDataCollect, limitGroupNode->group());

auto newLoop = static_cast<Loop *>(loop->clone());
auto newLoopGroup = OptGroup::create(ctx);
auto newLoopGroupNode = newLoopGroup->makeGroupNode(newLoop);

auto newPath = static_cast<decltype(pathNode)>(pathNode->clone());
if (pathKind == PlanNode::Kind::kBFSShortest) {
static_cast<BFSShortestPath *>(newPath)->setLimit(limitRows);
} else {
static_cast<MultiShortestPath *>(newPath)->setLimit(limitRows);
}
auto newPathGroup = OptGroup::create(ctx);
auto newPathGroupNode = newPathGroup->makeGroupNode(newPath);

newDataCollectGroupNode->dependsOn(newLoopGroup);

for (auto dep : pathGroupNode->dependencies()) {
newPathGroupNode->dependsOn(dep);
}

TransformResult result;
result.eraseAll = true;
result.newGroupNodes.emplace_back(newPathGroupNode);
return result;
}

std::string PushLimitDownShortestPathRule::toString() const {
return "PushLimitDownShortestPathRule";
}

} // namespace opt
} // namespace nebula
72 changes: 72 additions & 0 deletions src/graph/optimizer/rule/PushLimitDownShortestPathRule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef GRAPH_OPTIMIZER_RULE_PUSHLIMITDOWNSHORTESTPATHRULE_H
#define GRAPH_OPTIMIZER_RULE_PUSHLIMITDOWNSHORTESTPATHRULE_H

#include "graph/optimizer/OptRule.h"

namespace nebula {
namespace opt {

// Push [[Limit]] down [[BFSShortestPath / MulitShortestPath]]
// Required conditions:
// 1. Match the pattern
// Benefits:
// 1. Limit data early to optimize performance
//
// Tranformation:
// Before:
//
// +--------+--------+
// | Limit |
// | (limit=3) |
// +--------+--------+
// |
// +---------+---------+
// | DataCollect |
// +---------+---------+
// |
// +---------+---------+
// | Loop |
// +---------+---------+
// |
// +---------+---------+
// | ShortestPath |
// +---------+---------+
//
// After:
//
// +---------+---------+
// | DataCollect |
// +---------+---------+
// |
// +---------+---------+
// | Loop |
// +---------+---------+
// |
// +--------+--------+
// | ShortestPaths |
// | (limit=3) |
// +--------+--------+

class PushLimitDownShortestPathRule final : public OptRule {
public:
const Pattern &pattern() const override;

StatusOr<OptRule::TransformResult> transform(OptContext *ctx,
const MatchedResult &matched) const override;

std::string toString() const override;

private:
PushLimitDownShortestPathRule();

static std::unique_ptr<OptRule> kInstance;
};

} // namespace opt
} // namespace nebula
#endif
2 changes: 2 additions & 0 deletions src/graph/planner/ngql/PathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ SubPlan PathPlanner::singlePairPlan(PlanNode* left, PlanNode* right) {
path->setRightVidVar(pathCtx_->toVidsVar);
path->setColNames({kPathStr});
path->setTerminateEarlyVar(terminateEarlyVar);
path->setLimit(pathCtx_->limit);

auto* loopCondition = singlePairLoopCondition(steps, path->outputVar(), terminateEarlyVar);
auto* loop = Loop::make(qctx, nullptr, path, loopCondition);
Expand Down Expand Up @@ -221,6 +222,7 @@ SubPlan PathPlanner::multiPairPlan(PlanNode* left, PlanNode* right) {
path->setRightVidVar(pathCtx_->toVidsVar);
path->setTerminationVar(terminationVar);
path->setColNames({kPathStr});
path->setLimit(pathCtx_->limit);

SubPlan loopDep = loopDepPlan();
auto* loopCondition = multiPairLoopCondition(steps, terminationVar);
Expand Down
1 change: 0 additions & 1 deletion src/graph/planner/plan/Algo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "graph/util/ToJson.h"
namespace nebula {
namespace graph {

PlanNode* AllPaths::clone() const {
auto* path = AllPaths::make(qctx_, nullptr, nullptr, space_, steps_, noLoop_, withProp_);
path->cloneMembers(*this);
Expand Down
4 changes: 4 additions & 0 deletions src/graph/validator/FindPathValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Status FindPathValidator::validateImpl() {
NG_RETURN_IF_ERROR(validateWhere(fpSentence->where()));
NG_RETURN_IF_ERROR(ValidateUtil::validateStep(fpSentence->step(), pathCtx_->steps));
NG_RETURN_IF_ERROR(validateYield(fpSentence->yield()));
auto limitClause = fpSentence->limit();
if (limitClause != nullptr) {
pathCtx_->limit = limitClause->limit();
}

return Status::OK();
}
Expand Down
15 changes: 15 additions & 0 deletions src/parser/Clauses.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@
#include "interface/gen-cpp2/storage_types.h"

namespace nebula {
class LimitClause final {
public:
explicit LimitClause(int64_t limit) : limit_(limit) {}

std::string toString() const {
return folly::stringPrintf("LIMIT %ld", limit_);
}

int64_t limit() const {
return limit_;
}

private:
int64_t limit_{-1};
};
class StepClause final {
public:
explicit StepClause(uint32_t steps = 1) {
Expand Down
4 changes: 4 additions & 0 deletions src/parser/TraverseSentences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ std::string FindPathSentence::toString() const {
buf += " ";
buf += yield_->toString();
}
if (limit_ != nullptr) {
buf += " ";
buf += limit_->toString();
}
return buf;
}

Expand Down
9 changes: 9 additions & 0 deletions src/parser/TraverseSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ class FindPathSentence final : public Sentence {
noLoop_ = noLoop;
}

void setLimit(LimitClause* clause) {
limit_.reset(clause);
}

void setFrom(FromClause* clause) {
from_.reset(clause);
}
Expand Down Expand Up @@ -469,6 +473,10 @@ class FindPathSentence final : public Sentence {
return where_.get();
}

LimitClause* limit() const {
return limit_.get();
}

YieldClause* yield() const {
return yield_.get();
}
Expand Down Expand Up @@ -497,6 +505,7 @@ class FindPathSentence final : public Sentence {
std::unique_ptr<StepClause> step_;
std::unique_ptr<WhereClause> where_;
std::unique_ptr<YieldClause> yield_;
std::unique_ptr<LimitClause> limit_;
};

class LimitSentence final : public Sentence {
Expand Down
Loading

0 comments on commit 5929ab3

Please sign in to comment.