Skip to content

Commit

Permalink
Refactor go planner (#5369)
Browse files Browse the repository at this point in the history
* add expand

* add expand

* add expand

* add expand

* add expand

* add expand

* push filter down expand all

* fix error

* debug

* fix error

* inner hash in expandALL

* move getDstBySrc into Expand

* remove useless code

* fix ctest

* remove simple case code

* add comment

* fix argument error

* delete getdstbysrc

* add sample in expand & expandAll

* fix pushFilterDownHashInnerjoin

* add pushLimitDownExpandAllRule

* fix test case

* format

* add test case

* add stats info

* fix test case

* address comment

* fix error

* do not cache when last step

---------

Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com>
  • Loading branch information
nevermore3 and jievince authored Mar 13, 2023
1 parent b62168c commit e3e6916
Show file tree
Hide file tree
Showing 67 changed files with 2,996 additions and 2,655 deletions.
8 changes: 8 additions & 0 deletions src/common/algorithm/ReservoirSampling.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ class ReservoirSampling final {
samples_.reserve(num);
}

explicit ReservoirSampling(uint64_t num, uint64_t count) {
num_ = num;
samples_.reserve(num);
for (uint64_t i = 0; i < count; ++i) {
sampling(i);
}
}

bool sampling(T&& sample) {
if (cnt_ < num_) {
samples_.emplace_back(std::move(sample));
Expand Down
10 changes: 4 additions & 6 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,15 @@ struct GoContext final : AstContext {
bool joinInput{false};
// true when $$.tag.prop exist
bool joinDst{false};
// Optimize for some simple go sentence which only need dst id.
bool isSimple{false};
// The column name used by plan node`GetDstBySrc`
std::string dstIdColName{kDst};

ExpressionProps exprProps;

// save dst prop
YieldColumns* dstPropsExpr;
// save src and edge prop
YieldColumns* srcEdgePropsExpr;
// save src prop
YieldColumns* srcPropsExpr;
// save edge prop
YieldColumns* edgePropsExpr;
// for track vid in Nsteps
std::string srcVidColName;
std::string dstVidColName;
Expand Down
46 changes: 46 additions & 0 deletions src/graph/context/iterator/GetNbrsRespDataSetIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ void GetNbrsRespDataSetIter::buildPropIndex(const std::string& colName, size_t c
}
}

Value GetNbrsRespDataSetIter::getVid() const {
DCHECK(valid());
const Row& curRow = dataset_->rows[curRowIdx_];
return curRow[0];
}

Value GetNbrsRespDataSetIter::getVertex() const {
// Always check the valid() before getVertex
DCHECK(valid());
Expand Down Expand Up @@ -150,5 +156,45 @@ std::vector<Value> GetNbrsRespDataSetIter::getAdjEdges(VidHashSet* dstSet) const
return adjEdges;
}

std::unordered_set<Value> GetNbrsRespDataSetIter::getAdjDsts() const {
DCHECK(valid());

std::unordered_set<Value> adjDsts;
const Row& curRow = dataset_->rows[curRowIdx_];
for (const auto& [edgeName, propIdx] : edgePropsMap_) {
DCHECK_LT(propIdx.colIdx, curRow.size());
const Value& edgeColumn = curRow[propIdx.colIdx];
if (edgeColumn.isList()) {
for (const Value& edgeVal : edgeColumn.getList().values) {
// skip the edge direction symbol: `-/+`
auto name = edgeName.substr(1);
if (!edgeVal.isList() || edgeVal.getList().empty()) {
continue;
}
const List& propList = edgeVal.getList();
DCHECK_LT(propIdx.edgeDstIdx, propList.size());
auto& dst = propList[propIdx.edgeDstIdx];
adjDsts.emplace(dst);
}
}
}
return adjDsts;
}

size_t GetNbrsRespDataSetIter::size() {
size_t size = 0;
for (; valid(); next()) {
const Row& curRow = dataset_->rows[curRowIdx_];
for (const auto& [edgeName, propIdx] : edgePropsMap_) {
DCHECK_LT(propIdx.colIdx, curRow.size());
const Value& edgeColumn = curRow[propIdx.colIdx];
if (edgeColumn.isList()) {
size += edgeColumn.getList().values.size();
}
}
}
return size;
}

} // namespace graph
} // namespace nebula
6 changes: 6 additions & 0 deletions src/graph/context/iterator/GetNbrsRespDataSetIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class GetNbrsRespDataSetIter final {
Value getVertex() const;
std::vector<Value> getAdjEdges(VidHashSet* dstSet) const;

std::unordered_set<Value> getAdjDsts() const;

Value getVid() const;

size_t size();

private:
struct PropIndex {
size_t colIdx;
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ nebula_add_library(
query/ScanVerticesExecutor.cpp
query/ScanEdgesExecutor.cpp
query/TraverseExecutor.cpp
query/ExpandExecutor.cpp
query/ExpandAllExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
query/PatternApplyExecutor.cpp
query/GetDstBySrcExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
algo/ProduceAllPathsExecutor.cpp
Expand Down
12 changes: 8 additions & 4 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@
#include "graph/executor/query/AssignExecutor.h"
#include "graph/executor/query/DataCollectExecutor.h"
#include "graph/executor/query/DedupExecutor.h"
#include "graph/executor/query/ExpandAllExecutor.h"
#include "graph/executor/query/ExpandExecutor.h"
#include "graph/executor/query/FilterExecutor.h"
#include "graph/executor/query/FulltextIndexScanExecutor.h"
#include "graph/executor/query/GetDstBySrcExecutor.h"
#include "graph/executor/query/GetEdgesExecutor.h"
#include "graph/executor/query/GetNeighborsExecutor.h"
#include "graph/executor/query/GetVerticesExecutor.h"
Expand Down Expand Up @@ -197,6 +198,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kGetNeighbors: {
return pool->makeAndAdd<GetNeighborsExecutor>(node, qctx);
}
case PlanNode::Kind::kExpand: {
return pool->makeAndAdd<ExpandExecutor>(node, qctx);
}
case PlanNode::Kind::kExpandAll: {
return pool->makeAndAdd<ExpandAllExecutor>(node, qctx);
}
case PlanNode::Kind::kFulltextIndexScan: {
return pool->makeAndAdd<FulltextIndexScanExecutor>(node, qctx);
}
Expand Down Expand Up @@ -559,9 +566,6 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kShortestPath: {
return pool->makeAndAdd<ShortestPathExecutor>(node, qctx);
}
case PlanNode::Kind::kGetDstBySrc: {
return pool->makeAndAdd<GetDstBySrcExecutor>(node, qctx);
}
case PlanNode::Kind::kUnknown: {
DLOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/logic/ArgumentExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ folly::Future<Status> ArgumentExecutor::execute() {
DCHECK(iter != nullptr);

const auto &successor = successors();
auto sucessorExecutor = *successor.begin();
bool flag = sucessorExecutor->node()->kind() != PlanNode::Kind::kGetVertices;
auto kind = (*successor.begin())->node()->kind();
bool flag = (kind != PlanNode::Kind::kGetVertices && kind != PlanNode::Kind::kExpand);

DataSet ds;
ds.colNames = argNode->colNames();
Expand Down
51 changes: 0 additions & 51 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ folly::Future<Status> DataCollectExecutor::doCollect() {
NG_RETURN_IF_ERROR(rowBasedMove(vars));
break;
}
case DataCollect::DCKind::kMToN: {
NG_RETURN_IF_ERROR(collectMToN(vars, dc->step(), dc->distinct()));
break;
}
case DataCollect::DCKind::kBFSShortest: {
NG_RETURN_IF_ERROR(collectBFSShortest(vars));
break;
Expand Down Expand Up @@ -124,53 +120,6 @@ Status DataCollectExecutor::rowBasedMove(const std::vector<std::string>& vars) {
return Status::OK();
}

Status DataCollectExecutor::collectMToN(const std::vector<std::string>& vars,
const StepClause& mToN,
bool distinct) {
DataSet ds;
ds.colNames = std::move(colNames_);
DCHECK(!ds.colNames.empty());
robin_hood::unordered_flat_set<const Row*, std::hash<const Row*>> unique;
// itersHolder keep life cycle of iters util this method return.
std::vector<std::unique_ptr<Iterator>> itersHolder;
for (auto& var : vars) {
auto& hist = ectx_->getHistory(var);
std::size_t histSize = hist.size();
DCHECK_GE(mToN.mSteps(), 1);
std::size_t n = mToN.nSteps() > histSize ? histSize : mToN.nSteps();
for (auto i = mToN.mSteps() - 1; i < n; ++i) {
auto iter = hist[i].iter();
if (iter->isSequentialIter()) {
auto* seqIter = static_cast<SequentialIter*>(iter.get());
unique.reserve(seqIter->size());
while (seqIter->valid()) {
if (distinct && !unique.emplace(seqIter->row()).second) {
seqIter->unstableErase();
} else {
seqIter->next();
}
}
} else {
std::stringstream msg;
msg << "Iterator should be kind of SequentialIter, but was: " << iter->kind();
return Status::Error(msg.str());
}
itersHolder.emplace_back(std::move(iter));
}
}

for (auto& iter : itersHolder) {
if (iter->isSequentialIter()) {
auto* seqIter = static_cast<SequentialIter*>(iter.get());
for (seqIter->reset(); seqIter->valid(); seqIter->next()) {
ds.rows.emplace_back(seqIter->moveRow());
}
}
}
result_.setDataSet(std::move(ds));
return Status::OK();
}

Status DataCollectExecutor::collectBFSShortest(const std::vector<std::string>& vars) {
// Will rewrite this method once we implement returning the props for the
// path.
Expand Down
5 changes: 0 additions & 5 deletions src/graph/executor/query/DataCollectExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
//
// `rowBaseMove` : Collect the latest version of the results
//
// `collectMToN` : Only used in GO MToN scenarios. Collect the results generated by MTON
// steps (multi-version)
//
// `collectAllPaths` : Only used in FindPath scenarios. Collect the paths generated by the FindPath
// operator (multi-version)
//
Expand All @@ -42,8 +39,6 @@ class DataCollectExecutor final : public Executor {

Status rowBasedMove(const std::vector<std::string>& vars);

Status collectMToN(const std::vector<std::string>& vars, const StepClause& mToN, bool distinct);

Status collectBFSShortest(const std::vector<std::string>& vars);

Status collectAllPaths(const std::vector<std::string>& vars);
Expand Down
Loading

0 comments on commit e3e6916

Please sign in to comment.