Skip to content

Commit

Permalink
Refactor all path (#5409)
Browse files Browse the repository at this point in the history
* add allpath

* fix error

* add comment

* fix missing attribute

* add pathbase

* add project for pathplan

* build runtime plan for path

* push filter down allpaths

* push limit down allPaths

* fix ctest error

* add stats info

* multi-thread build path

* fix error

* add debug info

* refactor all path

* concurrent build path

* fix noloop

* add comment

* delete pathbaseexecutor

* add memory check

---------

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
nevermore3 and Sophie-Xie authored Mar 23, 2023
1 parent 913cc8b commit af4ac39
Show file tree
Hide file tree
Showing 26 changed files with 1,230 additions and 850 deletions.
2 changes: 1 addition & 1 deletion src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ nebula_add_library(
query/PatternApplyExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
algo/ProduceAllPathsExecutor.cpp
algo/AllPathsExecutor.cpp
algo/ShortestPathExecutor.cpp
algo/CartesianProductExecutor.cpp
algo/SubgraphExecutor.cpp
Expand Down
6 changes: 3 additions & 3 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
#include "graph/executor/admin/SwitchSpaceExecutor.h"
#include "graph/executor/admin/UpdateUserExecutor.h"
#include "graph/executor/admin/ZoneExecutor.h"
#include "graph/executor/algo/AllPathsExecutor.h"
#include "graph/executor/algo/BFSShortestPathExecutor.h"
#include "graph/executor/algo/CartesianProductExecutor.h"
#include "graph/executor/algo/MultiShortestPathExecutor.h"
#include "graph/executor/algo/ProduceAllPathsExecutor.h"
#include "graph/executor/algo/ShortestPathExecutor.h"
#include "graph/executor/algo/SubgraphExecutor.h"
#include "graph/executor/logic/ArgumentExecutor.h"
Expand Down Expand Up @@ -461,8 +461,8 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kMultiShortestPath: {
return pool->makeAndAdd<MultiShortestPathExecutor>(node, qctx);
}
case PlanNode::Kind::kProduceAllPaths: {
return pool->makeAndAdd<ProduceAllPathsExecutor>(node, qctx);
case PlanNode::Kind::kAllPaths: {
return pool->makeAndAdd<AllPathsExecutor>(node, qctx);
}
case PlanNode::Kind::kCartesianProduct: {
return pool->makeAndAdd<CartesianProductExecutor>(node, qctx);
Expand Down
82 changes: 82 additions & 0 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "graph/context/Iterator.h"
#include "graph/context/QueryExpressionContext.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/SchemaUtil.h"
#include "graph/util/Utils.h"
#include "interface/gen-cpp2/meta_types.h"
Expand Down Expand Up @@ -148,5 +149,86 @@ StatusOr<std::vector<Value>> StorageAccessExecutor::buildRequestListByVidType(It
return internal::buildRequestList<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

bool StorageAccessExecutor::hasSameEdge(const std::vector<Value> &edgeList, const Edge &edge) {
for (auto &leftEdge : edgeList) {
if (!leftEdge.isEdge()) {
continue;
}
if (edge.keyEqual(leftEdge.getEdge())) {
return true;
}
}
return false;
}

folly::Future<std::vector<Value>> StorageAccessExecutor::getProps(
const std::vector<Value> &vids, const std::vector<VertexProp> *vertexPropPtr) {
nebula::DataSet vertices({kVid});
vertices.rows.reserve(vids.size());
for (auto &vid : vids) {
vertices.emplace_back(Row({vid}));
}
StorageClient *storageClient = qctx_->getStorageClient();
StorageClient::CommonRequestParam param(qctx_->rctx()->session()->space().id,
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());
return DCHECK_NOTNULL(storageClient)
->getProps(
param, std::move(vertices), vertexPropPtr, nullptr, nullptr, false, {}, -1, nullptr)
.via(runner())
.thenValue([this](PropRpcResponse &&resp) {
memory::MemoryCheckGuard guard;
addStats(resp);
return handlePropResp(std::move(resp));
});
}

std::vector<Value> StorageAccessExecutor::handlePropResp(PropRpcResponse &&resps) {
std::vector<Value> vertices;
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
LOG(WARNING) << "GetProp partial fail";
return vertices;
}
nebula::DataSet v;
for (auto &resp : resps.responses()) {
if (resp.props_ref().has_value()) {
if (UNLIKELY(!v.append(std::move(*resp.props_ref())))) {
// it's impossible according to the interface
LOG(WARNING) << "Heterogeneous props dataset";
}
} else {
LOG(WARNING) << "GetProp partial success";
}
}
auto val = std::make_shared<Value>(std::move(v));
auto iter = std::make_unique<PropIter>(val);
vertices.reserve(iter->size());
for (; iter->valid(); iter->next()) {
vertices.emplace_back(iter->getVertex());
}
return vertices;
}

void StorageAccessExecutor::addGetNeighborStats(RpcResponse &resp, size_t stepNum, bool reverse) {
folly::dynamic stats = folly::dynamic::array();
auto &hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto &result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
stats.push_back(std::move(info));
}

auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum);
statsLock_.lock();
otherStats_.emplace(key, folly::toPrettyJson(stats));
statsLock_.unlock();
}

} // namespace graph
} // namespace nebula
19 changes: 19 additions & 0 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
#include "graph/executor/Executor.h"
#include "graph/util/Utils.h"

using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetNeighborsResponse;
using RpcResponse = StorageRpcResponse<GetNeighborsResponse>;
using PropRpcResponse = StorageRpcResponse<nebula::storage::cpp2::GetPropResponse>;
using VertexProp = nebula::storage::cpp2::VertexProp;
using nebula::storage::StorageClient;

namespace nebula {

class Expression;
Expand Down Expand Up @@ -167,6 +174,18 @@ class StorageAccessExecutor : public Executor {
Expression *expr,
bool dedup,
bool isCypher = false);

bool hasSameEdge(const std::vector<Value> &edgeList, const Edge &edge);

void addGetNeighborStats(RpcResponse &resp, size_t stepNum, bool reverse);

folly::Future<std::vector<Value>> getProps(const std::vector<Value> &vids,
const std::vector<VertexProp> *vertexPropPtr);

std::vector<Value> handlePropResp(PropRpcResponse &&resps);

protected:
folly::SpinLock statsLock_;
};

} // namespace graph
Expand Down
Loading

0 comments on commit af4ac39

Please sign in to comment.