Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Traverse executor in parallel #5314

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/graph/context/iterator/GetNbrsRespDataSetIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class GetNbrsRespDataSetIter final {
const Value& src,
const std::string& edgeName) const;

// my fields
const DataSet* dataset_;
size_t curRowIdx_;

Expand Down
204 changes: 144 additions & 60 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include "graph/util/SchemaUtil.h"
#include "graph/util/Utils.h"

DEFINE_uint64(traverse_parallel_threshold_rows,
150000,
"threshold row number of traverse executor in parallel");

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetNeighborsResponse;
Expand All @@ -25,15 +29,7 @@ folly::Future<Status> TraverseExecutor::execute() {
DataSet emptyDs;
return finish(ResultBuilder().value(Value(std::move(emptyDs))).build());
}
return getNeighbors().ensure([this]() {
// fill some profile time stats
if (expandTime_) {
otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime_));
}
if (expandOneStepTime_) {
otherStats_.emplace("expandOneStepTime", folly::sformat("{}(us)", expandOneStepTime_));
}
});
return getNeighbors();
}

Status TraverseExecutor::buildRequestVids() {
Expand Down Expand Up @@ -109,7 +105,17 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
vids_.clear();
SCOPED_TIMER(&execTime_);
addStats(resp, getNbrTime.elapsedInUSec());
return handleResponse(std::move(resp));
time::Duration expandTime;
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime.elapsedInUSec()));
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
NG_RETURN_IF_ERROR(s);
if (!isFinalStep() && !vids_.empty()) {
return getNeighbors();
}
return buildResult();
});
}

Expand Down Expand Up @@ -157,77 +163,155 @@ size_t TraverseExecutor::numRowsOfRpcResp(const RpcResponse& resps) const {
return numRows;
}

void TraverseExecutor::expandOneStep(const RpcResponse& resps) {
SCOPED_TIMER(&expandOneStepTime_);
initVertices_.reserve(numRowsOfRpcResp(resps));
template <typename T>
size_t sizeOf(const std::vector<T>& v) {
size_t sz = 0u;
for (auto& e : v) {
sz += e.size();
}
return sz;
}

for (const auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
for (GetNbrsRespDataSetIter iter(dataset); iter.valid(); iter.next()) {
Value v = iter.getVertex();
initVertices_.emplace_back(v);
VidHashSet dstSet;
auto adjEdges = iter.getAdjEdges(&dstSet);
for (const Value& dst : dstSet) {
if (adjList_.find(dst) == adjList_.end()) {
vids_.emplace(dst);
}
}
DCHECK(adjList_.find(v) == adjList_.end())
<< "The adjacency list should not contain the source vertex";
adjList_.emplace(v, std::move(adjEdges));
void TraverseExecutor::buildAdjList(DataSet& dataset,
std::vector<Value>& initVertices,
VidHashSet& vids,
VertexMap<Value>& adjList) const {
for (GetNbrsRespDataSetIter iter(&dataset); iter.valid(); iter.next()) {
Value v = iter.getVertex();
initVertices.emplace_back(v);
VidHashSet dstSet;
auto adjEdges = iter.getAdjEdges(&dstSet);
for (const Value& dst : dstSet) {
if (adjList_.find(dst) == adjList_.end()) {
vids.emplace(dst);
}
}
DCHECK(adjList_.find(v) == adjList_.end())
<< "The adjacency list should not contain the source vertex: " << v;
adjList.emplace(v, std::move(adjEdges));
}
}

if (range_.min() == 0) {
result_.rows = buildZeroStepPath();
folly::Future<Status> TraverseExecutor::asyncExpandOneStep(RpcResponse&& resps) {
size_t numResps = resps.responses().size();
auto initVerticesList = std::make_shared<std::vector<std::vector<Value>>>(numResps);
auto vidsList = std::make_shared<std::vector<VidHashSet>>(numResps);
auto adjLists = std::make_shared<std::vector<VertexMap<Value>>>(numResps);
auto taskRunTime = std::make_shared<std::vector<size_t>>(numResps, 0u);

std::vector<folly::Future<folly::Unit>> futures;
futures.reserve(numResps);

for (size_t i = 0; i < numResps; i++) {
auto dataset = resps.responses()[i].get_vertices();
if (!dataset) continue;
auto func = [this,
dataset = std::move(*dataset),
i,
initVerticesList,
vidsList,
adjLists,
taskRunTime]() mutable {
SCOPED_TIMER(&((*taskRunTime)[i]));
buildAdjList(dataset, (*initVerticesList)[i], (*vidsList)[i], (*adjLists)[i]);
};
futures.emplace_back(folly::via(runner(), std::move(func)));
}

return folly::collect(futures).via(runner()).thenValue(
[this, initVerticesList, vidsList, adjLists, taskRunTime](std::vector<folly::Unit>&&) {
time::Duration postTaskTime;
initVertices_.reserve(sizeOf(*initVerticesList));
for (auto& initVertices : *initVerticesList) {
std::move(initVertices.begin(), initVertices.end(), std::back_inserter(initVertices_));
}

vids_.reserve(sizeOf(*vidsList));
for (auto& vids : *vidsList) {
for (auto& v : vids) {
vids_.emplace(std::move(v));
}
}

adjList_.reserve(adjList_.size() + sizeOf(*adjLists));
for (auto& adjList : *adjLists) {
for (auto& p : adjList) {
adjList_.emplace(std::move(p.first), std::move(p.second));
}
}

auto t = postTaskTime.elapsedInUSec();
otherStats_.emplace("expandPostTaskTime", folly::sformat("{}(us)", t));
folly::dynamic taskRunTimeArray = folly::dynamic::array();
for (auto time : *taskRunTime) {
taskRunTimeArray.push_back(time);
}
otherStats_.emplace("expandTaskRunTime", folly::toPrettyJson(taskRunTimeArray));

return Status::OK();
});
}

folly::Future<Status> TraverseExecutor::expandOneStep(RpcResponse&& resps) {
auto numRows = numRowsOfRpcResp(resps);
if (numRows < FLAGS_traverse_parallel_threshold_rows) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< ???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When return too many vertices from storage, the collect task will speed a lot time. so not use the parallel execution in that case.

return asyncExpandOneStep(std::move(resps));
}

initVertices_.reserve(numRows);
for (auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
buildAdjList(*dataset, initVertices_, vids_, adjList_);
}
}

return Status::OK();
}

folly::Future<Status> TraverseExecutor::handleResponse(RpcResponse&& resps) {
NG_RETURN_IF_ERROR(handleCompleteness(resps, FLAGS_accept_partial_success));

if (currentStep_ == 1 && !traverse_->eFilter() && !traverse_->vFilter()) {
expandOneStep(resps);
} else {
List list;
for (auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
list.values.emplace_back(std::move(*dataset));
}
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
if (currentStep_ == 1) {
initVertices_.reserve(iter->numRows());
auto vertices = iter->getVertices();
// match (v)-[e:Rel]-(v1:Label1)-[e1*2]->() where id(v0) in [6, 23] return v1
// save the vertex that meets the filter conditions as the starting vertex of the current
// traverse
for (auto& vertex : vertices.values) {
if (vertex.isVertex()) {
initVertices_.emplace_back(vertex);
}
}
return expandOneStep(std::move(resps)).thenValue([this](Status s) {
NG_RETURN_IF_ERROR(s);
if (range_.min() == 0) {
result_.rows = buildZeroStepPath();
}
}

expand(iter.get());
return Status::OK();
});
}

if (!isFinalStep() && !vids_.empty()) {
return getNeighbors();
List list;
for (auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
list.values.emplace_back(std::move(*dataset));
}
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
if (currentStep_ == 1) {
initVertices_.reserve(iter->numRows());
auto vertices = iter->getVertices();
// match (v)-[e:Rel]-(v1:Label1)-[e1*2]->() where id(v0) in [6, 23] return v1
// save the vertex that meets the filter conditions as the starting vertex of the current
// traverse
for (auto& vertex : vertices.values) {
if (vertex.isVertex()) {
initVertices_.emplace_back(vertex);
}
}
if (range_.min() == 0) {
result_.rows = buildZeroStepPath();
}
}
return buildResult();

expand(iter.get());
return Status::OK();
}

void TraverseExecutor::expand(GetNeighborsIter* iter) {
SCOPED_TIMER(&expandTime_);
if (iter->numRows() == 0) {
return;
}
Expand Down
10 changes: 6 additions & 4 deletions src/graph/executor/query/TraverseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ class TraverseExecutor final : public StorageAccessExecutor {
size_t numRowsOfRpcResp(const RpcResponse& resps) const;

void expand(GetNeighborsIter* iter);
void expandOneStep(const RpcResponse& resps);
void buildAdjList(DataSet& dataset,
std::vector<Value>& initVertices,
VidHashSet& vids,
VertexMap<Value>& adjList) const;
folly::Future<Status> expandOneStep(RpcResponse&& resps);
folly::Future<Status> asyncExpandOneStep(RpcResponse&& resps);
folly::Future<Status> handleResponse(RpcResponse&& resps);

folly::Future<Status> buildResult();
Expand Down Expand Up @@ -97,9 +102,6 @@ class TraverseExecutor final : public StorageAccessExecutor {
const Traverse* traverse_{nullptr};
MatchStepRange range_;
size_t currentStep_{0};

size_t expandTime_{0u};
size_t expandOneStepTime_{0u};
};

} // namespace graph
Expand Down