Skip to content

Commit

Permalink
do not cache when last step
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Mar 12, 2023
1 parent e4715e6 commit 0ec5e90
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 18 deletions.
98 changes: 87 additions & 11 deletions src/graph/executor/query/ExpandAllExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ folly::Future<Status> ExpandAllExecutor::getNeighbors() {
.thenValue([this](Status s) -> folly::Future<Status> {
NG_RETURN_IF_ERROR(s);
if (qctx()->isKilled()) {
return Status::OK();
return Status::Error("Execution had been killed");
}
if (currentStep_ <= maxSteps_) {
if (!nextStepVids_.empty()) {
Expand All @@ -126,7 +126,7 @@ folly::Future<Status> ExpandAllExecutor::expandFromCache() {
for (; currentStep_ <= maxSteps_; ++currentStep_) {
time::Duration expandTime;
if (qctx()->isKilled()) {
return Status::OK();
return Status::Error("Execution had been killed");
}
curLimit_ = 0;
curMaxLimit_ =
Expand Down Expand Up @@ -183,7 +183,6 @@ void ExpandAllExecutor::getNeighborsFromCache(
break;
}
}

auto& dst = (*edgeIter).values.back();
if (adjList_.find(dst) == adjList_.end()) {
nextStepVids_.emplace(dst);
Expand All @@ -202,6 +201,66 @@ void ExpandAllExecutor::getNeighborsFromCache(
resetNextStepVids(visitedVids);
}

Status ExpandAllExecutor::handleLastStep(GetNeighborsIter* iter, std::vector<int64_t>& samples) {
QueryExpressionContext ctx(ectx_);
List curVertexProps;
Value curVid;
std::unordered_map<Value, std::unordered_set<Value>> dst2VidsMap;
std::unordered_set<Value> visitedVids;
if (iter->valid() && vertexColumns_) {
for (auto& col : vertexColumns_->columns()) {
Value val = col->expr()->eval(ctx(iter));
curVertexProps.values.emplace_back(std::move(val));
}
}
for (; iter->valid(); iter->next()) {
List edgeProps;
if (edgeColumns_) {
for (auto& col : edgeColumns_->columns()) {
Value val = col->expr()->eval(ctx(iter));
edgeProps.values.emplace_back(std::move(val));
}
}
const auto& vid = iter->getColumn(0);
curVid = curVid.empty() ? vid : curVid;
if (curVid != vid) {
curVid = vid;
if (vertexColumns_) {
for (auto& col : vertexColumns_->columns()) {
Value val = col->expr()->eval(ctx(iter));
curVertexProps.values.emplace_back(std::move(val));
}
}
}

if (sample_) {
if (samples.empty()) {
continue;
}
if (curLimit_++ != samples.back()) {
continue;
} else {
samples.pop_back();
}
} else {
if (curLimit_++ >= curMaxLimit_) {
continue;
}
}

if (joinInput_) {
auto findVid = preDst2VidsMap_.find(vid);
buildResult(findVid->second, curVertexProps, edgeProps, true);
} else {
buildResult(curVertexProps, edgeProps, true);
}
}
if (!preVisitedVids_.empty()) {
getNeighborsFromCache(dst2VidsMap, visitedVids, samples);
}
return Status::OK();
}

folly::Future<Status> ExpandAllExecutor::handleResponse(RpcResponse&& resps) {
NG_RETURN_IF_ERROR(handleCompleteness(resps, FLAGS_accept_partial_success));
List list;
Expand All @@ -216,13 +275,17 @@ folly::Future<Status> ExpandAllExecutor::handleResponse(RpcResponse&& resps) {
if (iter->numRows() == 0) {
return Status::OK();
}
auto size = iter->size();

std::vector<int64_t> samples;
if (sample_) {
auto size = iter->size();
algorithm::ReservoirSampling<int64_t> sampler(curMaxLimit_, size);
samples = sampler.samples();
std::sort(samples.begin(), samples.end(), [](int64_t a, int64_t b) { return a > b; });
}
if (currentStep_ > maxSteps_) {
return handleLastStep(iter.get(), samples);
}

QueryExpressionContext ctx(ectx_);
std::unordered_map<Value, std::unordered_set<Value>> dst2VidsMap;
Expand Down Expand Up @@ -312,25 +375,38 @@ folly::Future<Status> ExpandAllExecutor::handleResponse(RpcResponse&& resps) {

void ExpandAllExecutor::buildResult(const std::unordered_set<Value>& vids,
const List& vList,
const List& eList) {
if (vList.values.empty() && eList.values.empty()) {
return;
const List& eList,
bool isLastStep) {
std::vector<Value> list = vList.values;
if (!eList.values.empty()) {
if (isLastStep) {
list.insert(list.end(), eList.values.begin(), eList.values.end());
} else {
list.insert(list.end(), eList.values.begin(), eList.values.end() - 1);
}
}
for (auto& vid : vids) {
Row row;
row.values.emplace_back(vid);
row.values.insert(row.values.end(), vList.values.begin(), vList.values.end());
row.values.insert(row.values.end(), eList.values.begin(), eList.values.end() - 1);
if (!list.empty()) {
row.values.insert(row.values.end(),
std::make_move_iterator(list.begin()),
std::make_move_iterator(list.end()));
}
result_.rows.emplace_back(std::move(row));
}
}

void ExpandAllExecutor::buildResult(const List& vList, const List& eList) {
void ExpandAllExecutor::buildResult(const List& vList, const List& eList, bool isLastStep) {
if (vList.values.empty() && eList.values.empty()) {
return;
}
Row row = vList;
row.values.insert(row.values.end(), eList.values.begin(), eList.values.end() - 1);
if (isLastStep) {
row.values.insert(row.values.end(), eList.values.begin(), eList.values.end());
} else {
row.values.insert(row.values.end(), eList.values.begin(), eList.values.end() - 1);
}
result_.rows.emplace_back(std::move(row));
}

Expand Down
9 changes: 7 additions & 2 deletions src/graph/executor/query/ExpandAllExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ class ExpandAllExecutor final : public StorageAccessExecutor {
const Value& src,
const Value& dst);

void buildResult(const List& vList, const List& eList);
void buildResult(const List& vList, const List& eList, bool isLastStep = false);

void buildResult(const std::unordered_set<Value>& vids, const List& vList, const List& eList);
void buildResult(const std::unordered_set<Value>& vids,
const List& vList,
const List& eList,
bool isLastStep = false);

Status handleLastStep(GetNeighborsIter* iter, std::vector<int64_t>& samples);

using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsResponse>;
folly::Future<Status> handleResponse(RpcResponse&& resps);
Expand Down
19 changes: 14 additions & 5 deletions src/graph/executor/query/ExpandExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ folly::Future<Status> ExpandExecutor::getNeighbors() {
.thenValue([this](Status s) -> folly::Future<Status> {
NG_RETURN_IF_ERROR(s);
if (qctx()->isKilled()) {
return Status::OK();
return Status::Error("Execution had been killed");
}
if (currentStep_ < maxSteps_) {
if (!nextStepVids_.empty()) {
Expand All @@ -194,7 +194,7 @@ folly::Future<Status> ExpandExecutor::expandFromCache() {
for (; currentStep_ < maxSteps_; ++currentStep_) {
time::Duration expandTime;
if (qctx()->isKilled()) {
return Status::OK();
return Status::Error("Execution had been killed");
}
curLimit_ = 0;
curMaxLimit_ =
Expand Down Expand Up @@ -251,12 +251,15 @@ void ExpandExecutor::getNeighborsFromCache(
break;
}
}
updateDst2VidsMap(dst2VidsMap, vid, dst);
if (currentStep_ >= maxSteps_) {
continue;
}
if (adjDsts_.find(dst) == adjDsts_.end()) {
nextStepVids_.emplace(dst);
} else {
visitedVids.emplace(dst);
}
updateDst2VidsMap(dst2VidsMap, vid, dst);
}
}
}
Expand Down Expand Up @@ -295,7 +298,10 @@ folly::Future<Status> ExpandExecutor::handleResponse(RpcResponse&& resps) {
continue;
}
const auto& src = iter.getVid();
adjDsts_.emplace(src, dsts);
// do not cache in the last step
if (currentStep_ < maxSteps_) {
adjDsts_.emplace(src, dsts);
}

for (const auto& dst : dsts) {
if (sample_) {
Expand All @@ -312,13 +318,16 @@ folly::Future<Status> ExpandExecutor::handleResponse(RpcResponse&& resps) {
break;
}
}
updateDst2VidsMap(dst2VidsMap, src, dst);

if (currentStep_ >= maxSteps_) {
continue;
}
if (adjDsts_.find(dst) == adjDsts_.end()) {
nextStepVids_.emplace(dst);
} else {
visitedVids.emplace(dst);
}
updateDst2VidsMap(dst2VidsMap, src, dst);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/tck/features/go/GO.feature
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
@jmq
Feature: Go Sentence

Background:
Expand Down

0 comments on commit 0ec5e90

Please sign in to comment.