Skip to content

Commit

Permalink
Traverse executor in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Feb 6, 2023
1 parent fa117fe commit fe650f4
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 50 deletions.
1 change: 0 additions & 1 deletion src/graph/context/iterator/GetNbrsRespDataSetIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class GetNbrsRespDataSetIter final {
const Value& src,
const std::string& edgeName) const;

// my fields
const DataSet* dataset_;
size_t curRowIdx_;

Expand Down
183 changes: 138 additions & 45 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include "graph/util/SchemaUtil.h"
#include "graph/util/Utils.h"

DEFINE_uint64(traverse_parallel_threshold_rows,
150000,
"threshold row number of traverse executor in parallel");

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetNeighborsResponse;
Expand All @@ -25,15 +29,7 @@ folly::Future<Status> TraverseExecutor::execute() {
DataSet emptyDs;
return finish(ResultBuilder().value(Value(std::move(emptyDs))).build());
}
return getNeighbors().ensure([this]() {
// fill some profile time stats
if (expandTime_) {
otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime_));
}
if (expandOneStepTime_) {
otherStats_.emplace("expandOneStepTime", folly::sformat("{}(us)", expandOneStepTime_));
}
});
return getNeighbors();
}

Status TraverseExecutor::buildRequestVids() {
Expand Down Expand Up @@ -109,7 +105,17 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
vids_.clear();
SCOPED_TIMER(&execTime_);
addStats(resp, getNbrTime.elapsedInUSec());
return handleResponse(std::move(resp));
time::Duration expandTime;
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime.elapsedInUSec()));
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
NG_RETURN_IF_ERROR(s);
if (!isFinalStep() && !vids_.empty()) {
return getNeighbors();
}
return buildResult();
});
}

Expand Down Expand Up @@ -157,10 +163,98 @@ size_t TraverseExecutor::numRowsOfRpcResp(const RpcResponse& resps) const {
return numRows;
}

void TraverseExecutor::expandOneStep(const RpcResponse& resps) {
SCOPED_TIMER(&expandOneStepTime_);
initVertices_.reserve(numRowsOfRpcResp(resps));
template <typename T>
size_t sizeOf(const std::vector<T>& v) {
size_t sz = 0u;
for (auto& e : v) {
sz += e.size();
}
return sz;
}

folly::Future<Status> TraverseExecutor::asyncExpandOneStep(RpcResponse&& resps) {
size_t numResps = resps.responses().size();
auto initVerticesList = std::make_shared<std::vector<std::vector<Value>>>(numResps);
auto vidsList = std::make_shared<std::vector<VidHashSet>>(numResps);
auto adjLists = std::make_shared<std::vector<VertexMap<Value>>>(numResps);
auto taskRunTime = std::make_shared<std::vector<size_t>>(numResps, 0u);

std::vector<folly::Future<folly::Unit>> futures;
futures.reserve(numResps);

for (size_t i = 0; i < numResps; i++) {
auto dataset = resps.responses()[i].get_vertices();
if (!dataset) continue;
auto func = [this,
dataset = std::move(*dataset),
i,
initVerticesList,
vidsList,
adjLists,
taskRunTime]() {
SCOPED_TIMER(&((*taskRunTime)[i]));
auto& initVertices = (*initVerticesList)[i];
auto& vids = (*vidsList)[i];
auto& adjList = (*adjLists)[i];
for (GetNbrsRespDataSetIter iter(&dataset); iter.valid(); iter.next()) {
Value v = iter.getVertex();
initVertices.emplace_back(v);
VidHashSet dstSet;
auto adjEdges = iter.getAdjEdges(&dstSet);
for (const Value& dst : dstSet) {
if (adjList_.find(dst) == adjList_.end()) {
vids.emplace(dst);
}
}
DCHECK(adjList_.find(v) == adjList_.end())
<< "The adjacency list should not contain the source vertex: " << v;
adjList.emplace(v, std::move(adjEdges));
}
};
futures.emplace_back(folly::via(runner(), std::move(func)));
}

return folly::collect(futures).via(runner()).thenValue(
[this, initVerticesList, vidsList, adjLists, taskRunTime](std::vector<folly::Unit>&&) {
time::Duration postTaskTime;
initVertices_.reserve(sizeOf(*initVerticesList));
for (auto& initVertices : *initVerticesList) {
std::move(initVertices.begin(), initVertices.end(), std::back_inserter(initVertices_));
}

vids_.reserve(sizeOf(*vidsList));
for (auto& vids : *vidsList) {
for (auto& v : vids) {
vids_.emplace(std::move(v));
}
}

adjList_.reserve(adjList_.size() + sizeOf(*adjLists));
for (auto& adjList : *adjLists) {
for (auto& p : adjList) {
adjList_.emplace(std::move(p.first), std::move(p.second));
}
}

auto t = postTaskTime.elapsedInUSec();
otherStats_.emplace("expandPostTaskTime", folly::sformat("{}(us)", t));
folly::dynamic taskRunTimeArray = folly::dynamic::array();
for (auto time : *taskRunTime) {
taskRunTimeArray.push_back(time);
}
otherStats_.emplace("expandTaskRunTime", folly::toPrettyJson(taskRunTimeArray));

return Status::OK();
});
}

folly::Future<Status> TraverseExecutor::expandOneStep(RpcResponse&& resps) {
auto numRows = numRowsOfRpcResp(resps);
if (numRows < FLAGS_traverse_parallel_threshold_rows) {
return asyncExpandOneStep(std::move(resps));
}

initVertices_.reserve(numRows);
for (const auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
Expand All @@ -181,53 +275,52 @@ void TraverseExecutor::expandOneStep(const RpcResponse& resps) {
}
}

if (range_.min() == 0) {
result_.rows = buildZeroStepPath();
}
return Status::OK();
}

folly::Future<Status> TraverseExecutor::handleResponse(RpcResponse&& resps) {
NG_RETURN_IF_ERROR(handleCompleteness(resps, FLAGS_accept_partial_success));

if (currentStep_ == 1 && !traverse_->eFilter() && !traverse_->vFilter()) {
expandOneStep(resps);
} else {
List list;
for (auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
list.values.emplace_back(std::move(*dataset));
}
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
if (currentStep_ == 1) {
initVertices_.reserve(iter->numRows());
auto vertices = iter->getVertices();
// match (v)-[e:Rel]-(v1:Label1)-[e1*2]->() where id(v0) in [6, 23] return v1
// save the vertex that meets the filter conditions as the starting vertex of the current
// traverse
for (auto& vertex : vertices.values) {
if (vertex.isVertex()) {
initVertices_.emplace_back(vertex);
}
}
return expandOneStep(std::move(resps)).thenValue([this](Status s) {
NG_RETURN_IF_ERROR(s);
if (range_.min() == 0) {
result_.rows = buildZeroStepPath();
}
}

expand(iter.get());
return Status::OK();
});
}

if (!isFinalStep() && !vids_.empty()) {
return getNeighbors();
List list;
for (auto& resp : resps.responses()) {
auto dataset = resp.get_vertices();
if (dataset) {
list.values.emplace_back(std::move(*dataset));
}
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
if (currentStep_ == 1) {
initVertices_.reserve(iter->numRows());
auto vertices = iter->getVertices();
// match (v)-[e:Rel]-(v1:Label1)-[e1*2]->() where id(v0) in [6, 23] return v1
// save the vertex that meets the filter conditions as the starting vertex of the current
// traverse
for (auto& vertex : vertices.values) {
if (vertex.isVertex()) {
initVertices_.emplace_back(vertex);
}
}
if (range_.min() == 0) {
result_.rows = buildZeroStepPath();
}
}
return buildResult();

expand(iter.get());
return Status::OK();
}

void TraverseExecutor::expand(GetNeighborsIter* iter) {
SCOPED_TIMER(&expandTime_);
if (iter->numRows() == 0) {
return;
}
Expand Down
6 changes: 2 additions & 4 deletions src/graph/executor/query/TraverseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class TraverseExecutor final : public StorageAccessExecutor {
size_t numRowsOfRpcResp(const RpcResponse& resps) const;

void expand(GetNeighborsIter* iter);
void expandOneStep(const RpcResponse& resps);
folly::Future<Status> expandOneStep(RpcResponse&& resps);
folly::Future<Status> asyncExpandOneStep(RpcResponse&& resps);
folly::Future<Status> handleResponse(RpcResponse&& resps);

folly::Future<Status> buildResult();
Expand Down Expand Up @@ -97,9 +98,6 @@ class TraverseExecutor final : public StorageAccessExecutor {
const Traverse* traverse_{nullptr};
MatchStepRange range_;
size_t currentStep_{0};

size_t expandTime_{0u};
size_t expandOneStepTime_{0u};
};

} // namespace graph
Expand Down

0 comments on commit fe650f4

Please sign in to comment.