Skip to content

Commit

Permalink
concurrent execute
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Mar 31, 2022
1 parent 7660eeb commit ff044d1
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 360 deletions.
192 changes: 114 additions & 78 deletions src/graph/executor/algo/ShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,72 +12,104 @@ folly::Future<Status> ShortestPathExecutor::execute() {
SCOPED_TIMER(&execTime_);
single_ = pathNode_->single();
range_ = {pathNode_->stepRange()->min(), pathNode_->stepRange()->max()};
NG_RETURN_IF_ERROR(buildRequestDataSet());
auto& colNames = pathNode_->colNames();
auto rowSize = buildRequestDataSet();
std::vector<folly::Future<Status>> futures;
for (size_t i = 0; i < cartesianProduct_.size(); ++i) {
futures.emplace_back(shortestPath(i));
for (size_t rowNum = 0; rowNum < rowSize; ++rowNum) {
resultDs_[rowNum].colNames = colNames;
futures.emplace_back(shortestPath(rowNum, 0));
}
return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
return folly::collect(futures).via(runner()).thenValue([this, &colNames](auto&& resps) {
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
return finish(ResultBuilder().value(Value(std::move(resultDs_))).build());
// append dataset
DataSet result;
result.colNames = colNames;
for (auto& ds : resultDs_) {
result.append(std::move(ds));
}
return finish(ResultBuilder().value(Value(std::move(result))).build());
});
}

Status ShortestPathExecutor::buildRequestDataSet() {
size_t ShortestPathExecutor::buildRequestDataSet() {
auto iter = ectx_->getResult(pathNode_->inputVar()).iter();
cartesianProduct_.reserve(iter->size());
const auto& vidType = *(qctx()->rctx()->session()->space().spaceDesc.vid_type_ref());
auto rowSize = iter->size();
leftVids_.reserve(rowSize);
rightVids_.reserve(rowSize);
leftVisitedVids_.resize(rowSize);
rightVisitedVids_.resize(rowSize);
allLeftSteps_.resize(rowSize);
allRightSteps_.reserve(rowSize);
resultDs_.resize(rowSize);

const auto& vidType = *(qctx()->rctx()->session()->space().spaceDesc.vid_type_ref());
std::unordered_set<std::pair<Value, Value>> uniqueSet;
uniqueSet.reserve(iter->size());
uniqueSet.reserve(rowSize);
for (; iter->valid(); iter->next()) {
auto start = iter->getColumn(0);
auto end = iter->getColumn(1);
if (!SchemaUtil::isValidVid(start, vidType) || !SchemaUtil::isValidVid(end, vidType)) {
return Status::Error("Mismatched vid type, space vid type: %s",
SchemaUtil::typeToString(vidType).c_str());
LOG(ERROR) << "Mismatched vid type. start type : " << start.type()
<< ", end type: " << end.type()
<< ", space vid type: " << SchemaUtil::typeToString(vidType);
continue;
}
if (start == end) {
// continue or return error
continue;
}
if (uniqueSet.emplace(std::pair<Value, Value>{start, end}).second) {
cartesianProduct_.emplace_back(std::move(start), std::move(end));
// set origin rightStep_;
std::unordered_map<Value, std::vector<Row>> steps;
std::vector<Row> dummy;
steps.emplace(end, std::move(dummy));
std::vector<std::unordered_map<Value, std::vector<Row>>> originRightStep({std::move(steps)});
allRightSteps_.emplace_back(std::move(originRightStep));

DataSet startDs, endDs;
startDs.rows.emplace_back(Row({std::move(start)}));
endDs.rows.emplace_back(Row({std::move(end)}));
leftVids_.emplace_back(std::move(startDs));
rightVids_.emplace_back(std::move(endDs));
}
}
if (!cartesianProduct_.empty()) {
leftVids_.rows.emplace_back(Row({std::move(cartesianProduct_.front().first)}));
rightVids_.rows.emplace_back(Row({std::move(cartesianProduct_.front().second)}));
}
return Status::OK();
return rowSize;
}

folly::Future<Status> ShortestPathExecutor::shortestPath(size_t i) {
folly::Future<Status> ShortestPathExecutor::shortestPath(size_t rowNum, size_t stepNum) {
VLOG(1) << "current rowNum is : " << rowNum << " step is :" << stepNum;
std::vector<folly::Future<Status>> futures;
futures.emplace_back(getNeighbors(false));
futures.emplace_back(getNeighbors(true));
return folly::collect(futures).via(runner()).thenValue([this, i](auto&& resps) {
futures.emplace_back(getNeighbors(rowNum, false));
futures.emplace_back(getNeighbors(rowNum, true));
return folly::collect(futures).via(runner()).thenValue([this, rowNum, stepNum](auto&& resps) {
for (auto& resp : resps) {
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
}
return handleResponse(i);
return handleResponse(rowNum, stepNum);
});
}

folly::Future<Status> ShortestPathExecutor::getNeighbors(bool reverse) {
folly::Future<Status> ShortestPathExecutor::getNeighbors(size_t rowNum, bool reverse) {
if (reverse) {
VLOG(1) << "reverse GetNeightbor input : " << rightVids_[rowNum].toString();
} else {
VLOG(1) << "GetNeightbor input : " << leftVids_[rowNum].toString();
}
StorageClient* storageClient = qctx_->getStorageClient();
time::Duration getNbrTime;
storage::StorageClient::CommonRequestParam param(pathNode_->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
auto& inputRows = reverse ? rightVids_[rowNum].rows : leftVids_[rowNum].rows;
return storageClient
->getNeighbors(param,
{nebula::kVid},
reverse ? std::move(rightVids_.rows) : std::move(leftVids_.rows),
std::move(inputRows),
{},
pathNode_->edgeDirection(),
nullptr,
Expand All @@ -90,43 +122,47 @@ folly::Future<Status> ShortestPathExecutor::getNeighbors(bool reverse) {
-1,
nullptr)
.via(runner())
.thenValue([this, reverse](auto&& resp) { return buildPath(std::move(resp), reverse); });
.ensure([this, rowNum, getNbrTime]() {
SCOPED_TIMER(&execTime_);
// otherStats_.emplace("rowNum", rowNum);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
})
.thenValue([this, rowNum, reverse](auto&& resp) {
SCOPED_TIMER(&execTime_);
addStats(resp, otherStats_);
return buildPath(rowNum, std::move(resp), reverse);
});
}

folly::Future<Status> ShortestPathExecutor::handleResponse(size_t i) {
if (conjunctPath()) {
if (i < cartesianProduct_.size() - 1) {
leftVids_.rows = {Row({std::move(cartesianProduct_[i + 1].first)})};
rightVids_.rows = {Row({std::move(cartesianProduct_[i + 1].second)})};
}
leftVisitedVids_.clear();
rightVisitedVids_.clear();
allLeftSteps_.clear();
allRightSteps_.clear();
folly::Future<Status> ShortestPathExecutor::handleResponse(size_t rowNum, size_t stepNum) {
if (conjunctPath(rowNum, stepNum)) {
return Status::OK();
}
step_++;
if (step_ > 10) {
stepNum++;
if (stepNum * 2 >= range_.second) {
return Status::OK();
}
return shortestPath(i);
return shortestPath(rowNum, stepNum);
}

bool ShortestPathExecutor::conjunctPath() {
const auto& leftStep = allLeftSteps_.back();
const auto& prevRightStep = allRightSteps_[step_ - 1];
bool ShortestPathExecutor::conjunctPath(size_t rowNum, size_t stepNum) {
const auto& leftStep = allLeftSteps_[rowNum].back();
const auto& prevRightStep = allRightSteps_[rowNum][stepNum];
std::vector<Value> meetVids;
for (const auto& step : leftStep) {
if (prevRightStep.find(step.first) != prevRightStep.end()) {
meetVids.push_back(step.first);
}
}
if (!meetVids.empty()) {
buildOddPath(meetVids);
buildOddPath(rowNum, meetVids);
return true;
}
if (stepNum * 2 >= range_.second) {
return false;
}

const auto& rightStep = allRightSteps_.back();
const auto& rightStep = allRightSteps_[rowNum].back();
for (const auto& step : leftStep) {
if (rightStep.find(step.first) != rightStep.end()) {
meetVids.push_back(step.first);
Expand All @@ -135,10 +171,10 @@ bool ShortestPathExecutor::conjunctPath() {
if (meetVids.empty()) {
return false;
}
return buildEvenPath(meetVids);
return buildEvenPath(rowNum, meetVids);
}

Status ShortestPathExecutor::buildPath(RpcResponse&& resps, bool reverse) {
Status ShortestPathExecutor::buildPath(size_t rowNum, RpcResponse&& resps, bool reverse) {
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
NG_RETURN_IF_ERROR(result);
auto& responses = std::move(resps).responses();
Expand All @@ -151,26 +187,21 @@ Status ShortestPathExecutor::buildPath(RpcResponse&& resps, bool reverse) {
}
list.values.emplace_back(std::move(*dataset));
}
if (reverse) {
VLOG(1) << "reverse GetNeightbor output: " << list.toString().c_str();
} else {
VLOG(1) << "GetNeightbor output: " << list.toString().c_str();
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
return doBuildPath(iter.get(), reverse);
return doBuildPath(rowNum, iter.get(), reverse);
}

Status ShortestPathExecutor::doBuildPath(GetNeighborsIter* iter, bool reverse) {
Status ShortestPathExecutor::doBuildPath(size_t rowNum, GetNeighborsIter* iter, bool reverse) {
auto iterSize = iter->size();
auto& visitedVids = reverse ? rightVisitedVids_ : leftVisitedVids_;
auto& visitedVids = reverse ? rightVisitedVids_[rowNum] : leftVisitedVids_[rowNum];
visitedVids.reserve(visitedVids.size() + iterSize);
auto& allSteps = reverse ? allRightSteps_ : allLeftSteps_;
if (reverse && step_ == 0) {
std::unordered_map<Value, std::vector<Row>> steps;
for (; iter->valid(); iter->next()) {
// set oright rightPath_;
auto& vid = iter->getColumn(0);
std::vector<Row> dummpy;
steps.emplace(vid, std::move(dummpy));
}
allSteps.emplace_back(std::move(steps));
}
auto& allSteps = reverse ? allRightSteps_[rowNum] : allLeftSteps_[rowNum];
allSteps.emplace_back();
auto& currentStep = allSteps.back();

Expand Down Expand Up @@ -224,63 +255,66 @@ Status ShortestPathExecutor::doBuildPath(GetNeighborsIter* iter, bool reverse) {
visitedVids.insert(std::make_move_iterator(uniqueDst.begin()),
std::make_move_iterator(uniqueDst.end()));
if (reverse) {
rightVids_.rows.swap(nextStepVids);
rightVids_[rowNum].rows.swap(nextStepVids);
VLOG(1) << "reverse next Vid : " << rightVids_[rowNum].toString() << " rowNum" << rowNum;
} else {
leftVids_.rows.swap(nextStepVids);
leftVids_[rowNum].rows.swap(nextStepVids);
VLOG(1) << "current next Vid : " << leftVids_[rowNum].toString() << " rowNum " << rowNum;
}
return Status::OK();
}

void ShortestPathExecutor::buildOddPath(const std::vector<Value>& meetVids) {
void ShortestPathExecutor::buildOddPath(size_t rowNum, const std::vector<Value>& meetVids) {
for (auto& meetVid : meetVids) {
auto leftPaths = createLeftPath(meetVid);
auto rightPaths = createRightPath(meetVid, true);
auto leftPaths = createLeftPath(rowNum, meetVid);
auto rightPaths = createRightPath(rowNum, meetVid, true);
for (auto& leftPath : leftPaths) {
for (auto& rightPath : rightPaths) {
Row path = leftPath;
auto& steps = path.values.back().mutableList().values;
steps.insert(steps.end(), rightPath.values.begin(), rightPath.values.end());
resultDs_.rows.emplace_back(std::move(path));
resultDs_[rowNum].rows.emplace_back(std::move(path));
}
}
}
}

bool ShortestPathExecutor::buildEvenPath(const std::vector<Value>& meetVids) {
bool ShortestPathExecutor::buildEvenPath(size_t rowNum, const std::vector<Value>& meetVids) {
std::vector<Value> meetVertices;
auto status = getMeetVidsProps(meetVids, meetVertices).get();
if (!status.ok() || meetVertices.empty()) {
return false;
}
for (auto& meetVertex : meetVertices) {
auto meetVid = meetVertex.getVertex().vid;
auto leftPaths = createLeftPath(meetVid);
auto rightPaths = createRightPath(meetVid, false);
auto leftPaths = createLeftPath(rowNum, meetVid);
auto rightPaths = createRightPath(rowNum, meetVid, false);
for (auto& leftPath : leftPaths) {
for (auto& rightPath : rightPaths) {
Row path = leftPath;
auto& steps = path.values.back().mutableList().values;
steps.emplace_back(meetVertex);
steps.insert(steps.end(), rightPath.values.begin(), rightPath.values.end());
resultDs_.rows.emplace_back(std::move(path));
resultDs_[rowNum].rows.emplace_back(std::move(path));
}
}
}
return true;
}

std::vector<Row> ShortestPathExecutor::createLeftPath(const Value& meetVid) {
auto& lastSteps = allLeftSteps_.back();
std::vector<Row> ShortestPathExecutor::createLeftPath(size_t rowNum, const Value& meetVid) {
auto& allSteps = allLeftSteps_[rowNum];
auto& lastSteps = allSteps.back();
auto findMeetVid = lastSteps.find(meetVid);
std::vector<Row> leftPaths(findMeetVid->second);
for (auto stepIter = allLeftSteps_.rbegin() + 1; stepIter != allLeftSteps_.rend(); ++stepIter) {
for (auto stepIter = allSteps.rbegin() + 1; stepIter != allSteps.rend(); ++stepIter) {
std::vector<Row> temp;
for (auto& leftPath : leftPaths) {
Value id = leftPath.values.front().getVertex().vid;
auto findId = stepIter->find(id);
for (auto& step : findId->second) {
auto newPath = leftPath;
newPath.values.insert(leftPath.values.begin(), step.values.begin(), step.values.end());
newPath.values.insert(newPath.values.begin(), step.values.begin(), step.values.end());
temp.emplace_back(std::move(newPath));
}
}
Expand All @@ -299,9 +333,12 @@ std::vector<Row> ShortestPathExecutor::createLeftPath(const Value& meetVid) {
return result;
}

std::vector<Row> ShortestPathExecutor::createRightPath(const Value& meetVid, bool oddStep) {
std::vector<Row> ShortestPathExecutor::createRightPath(size_t rowNum,
const Value& meetVid,
bool oddStep) {
auto& allSteps = allRightSteps_[rowNum];
std::vector<Row> rightPaths;
auto& lastSteps = allRightSteps_.back();
auto& lastSteps = allSteps.back();
if (oddStep) {
for (auto& steps : lastSteps) {
bool flag = false;
Expand All @@ -321,8 +358,7 @@ std::vector<Row> ShortestPathExecutor::createRightPath(const Value& meetVid, boo
auto findMeetVid = lastSteps.find(meetVid);
rightPaths = findMeetVid->second;
}
for (auto stepIter = allRightSteps_.rbegin() + 1; stepIter != allRightSteps_.rend() - 1;
++stepIter) {
for (auto stepIter = allSteps.rbegin() + 1; stepIter != allSteps.rend() - 1; ++stepIter) {
std::vector<Row> temp;
for (auto& rightPath : rightPaths) {
Value id = rightPath.values.front().getVertex().vid;
Expand Down Expand Up @@ -375,7 +411,7 @@ folly::Future<Status> ShortestPathExecutor::getMeetVidsProps(const std::vector<V
})
.thenValue([this, &meetVertices](PropRpcResponse&& resp) {
SCOPED_TIMER(&execTime_);
// addStats(resp, otherStats_);
addStats(resp, otherStats_);
return handlePropResp(std::move(resp), meetVertices);
});
}
Expand Down
Loading

0 comments on commit ff044d1

Please sign in to comment.