Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust get neighbor plan #3458

Merged
merged 6 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ struct RuntimeContext {
// used for update
bool insert_ = false;

// some times, one line is filter out but still return (has edge)
// and some time, this line is just removed from the return result
bool filterInvalidResultOut = false;

ResultStatus resultStat_{ResultStatus::NORMAL};
};

Expand Down
50 changes: 38 additions & 12 deletions src/storage/exec/FilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
namespace nebula {
namespace storage {

enum class FilterMode {
TAG_AND_EDGE = 0,
TAG_ONLY = 1,
};

/*
FilterNode will receive the result from upstream, check whether tag data or edge
data could pass the expression filter. FilterNode can only accept one upstream
Expand Down Expand Up @@ -59,27 +64,48 @@ class FilterNode : public IterateNode<T> {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void setFilterMode(FilterMode mode) {
mode_ = mode;
}

private:
// return true when the value iter points to a value which can filter
bool check() override {
if (filterExp_ != nullptr) {
expCtx_->reset(this->reader(), this->key().str());
// result is false when filter out
auto result = filterExp_->eval(*expCtx_);
// NULL is always false
auto ret = result.toBool();
if (ret.isBool() && ret.getBool()) {
return true;
}
return false;
if (filterExp_ == nullptr) {
return true;
}
return true;
switch (mode_) {
case FilterMode::TAG_AND_EDGE:
return checkTagAndEdge();
case FilterMode::TAG_ONLY:
return checkTagOnly();
default:
return checkTagAndEdge();
}
}

bool checkTagOnly() {
auto result = filterExp_->eval(*expCtx_);
// NULL is always false
auto ret = result.toBool();
return ret.isBool() && ret.getBool();
}

// return true when the value iter points to a value which can filter
bool checkTagAndEdge() {
expCtx_->reset(this->reader(), this->key().str());
// result is false when filter out
auto result = filterExp_->eval(*expCtx_);
// NULL is always false
auto ret = result.toBool();
return ret.isBool() && ret.getBool();
}

private:
RuntimeContext* context_;
StorageExpressionContext* expCtx_;
Expression* filterExp_;
FilterMode mode_{FilterMode::TAG_AND_EDGE};
int32_t callCheck{0};
};

} // namespace storage
Expand Down
10 changes: 9 additions & 1 deletion src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,22 @@ class GetNeighborsNode : public QueryNode<VertexID> {
row[1].setList(agg->mutableResult().moveList());
}

resultDataSet_->rows.emplace_back(std::move(row));
// only set filterInvalidResultOut = true in TagOnly mode
// so if it it an edge, this test is always true
if (!context_->filterInvalidResultOut || context_->resultStat_ == ResultStatus::NORMAL) {
liuyu85cn marked this conversation as resolved.
Show resolved Hide resolved
resultDataSet_->rows.emplace_back(std::move(row));
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

protected:
GetNeighborsNode() = default;

virtual nebula::cpp2::ErrorCode iterateEdges(std::vector<Value>& row) {
if (edgeContext_->propContexts_.empty()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
int64_t edgeRowCount = 0;
nebula::List list;
for (; upstream_->valid(); upstream_->next(), ++edgeRowCount) {
Expand Down
118 changes: 118 additions & 0 deletions src/storage/exec/MultiTagNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include "common/base/Base.h"
#include "storage/context/StorageExpressionContext.h"
#include "storage/exec/EdgeNode.h"
#include "storage/exec/StorageIterator.h"
#include "storage/exec/TagNode.h"

namespace nebula {
namespace storage {

// MultiTagNode is a replacement of HashJoinNode
// in execution of "go over"
// if Graph don't pass any Edge prop
class MultiTagNode : public IterateNode<VertexID> {
public:
using RelNode::doExecute;

MultiTagNode(RuntimeContext* context,
const std::vector<TagNode*>& tagNodes,
StorageExpressionContext* expCtx)
: context_(context), tagNodes_(tagNodes), expCtx_(expCtx) {
IterateNode::name_ = "MultiTagNode";
}

nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}

if (expCtx_ != nullptr) {
expCtx_->clear();
}
result_.setList(nebula::List());
auto& result = result_.mutableList();
if (context_->resultStat_ == ResultStatus::ILLEGAL_DATA) {
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
[&result](const std::vector<PropContext>*) -> nebula::cpp2::ErrorCode {
result.values.emplace_back(Value());
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[this, &result, tagNode](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
nebula::List list;
list.reserve(props->size());
const auto& tagName = tagNode->getTagName();
for (const auto& prop : *props) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readVertexProp(
key, context_->vIdLen(), context_->isIntId(), reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagName, prop.name_, value.value());
}
if (prop.returned_) {
list.emplace_back(std::move(value).value());
}
}
result.values.emplace_back(std::move(list));
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

bool valid() const override {
auto ret = tagNodes_.back()->valid();
liuyu85cn marked this conversation as resolved.
Show resolved Hide resolved
return ret;
}

void next() override {
tagNodes_.back()->next();
}

folly::StringPiece key() const override {
LOG(FATAL) << "not allowed to do this";
return "";
}

folly::StringPiece val() const override {
LOG(FATAL) << "not allowed to do this";
return "";
}

RowReader* reader() const override {
LOG(FATAL) << "not allowed to do this";
return nullptr;
}

private:
RuntimeContext* context_;
std::vector<TagNode*> tagNodes_;
StorageExpressionContext* expCtx_;
};

} // namespace storage
} // namespace nebula
88 changes: 56 additions & 32 deletions src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "storage/exec/FilterNode.h"
#include "storage/exec/GetNeighborsNode.h"
#include "storage/exec/HashJoinNode.h"
#include "storage/exec/MultiTagNode.h"
#include "storage/exec/TagNode.h"

namespace nebula {
Expand Down Expand Up @@ -79,6 +80,7 @@ void GetNeighborsProcessor::runInSingleThread(const cpp2::GetNeighborsRequest& r
auto plan = buildPlan(&contexts_.front(), &expCtxs_.front(), &resultDataSet_, limit, random);
std::unordered_set<PartitionID> failedParts;
for (const auto& partEntry : req.get_parts()) {
contexts_.front().resultStat_ = ResultStatus::NORMAL;
auto partId = partEntry.first;
for (const auto& row : partEntry.second) {
CHECK_GE(row.values.size(), 1);
Expand Down Expand Up @@ -184,24 +186,32 @@ StoragePlan<VertexID> GetNeighborsProcessor::buildPlan(RuntimeContext* context,
bool random) {
/*
The StoragePlan looks like this:
+--------+---------+
| GetNeighborsNode |
+--------+---------+
|
+--------+---------+
| AggregateNode |
+--------+---------+
|
+--------+---------+
| FilterNode |
+--------+---------+
|
+--------+---------+
+-->+ HashJoinNode +<----+
| +------------------+ |
+--------+---------+ +---------+--------+
| TagNodes | | EdgeNodes |
+------------------+ +------------------+
+------------------+ or, if there is no edge:
| GetNeighborsNode |
+--------+---------+ +-----------------+
| |GetNeighborsNode |
+--------+---------+ +--------+--------+
| AggregateNode | |
+--------+---------+ +------+------+
| |AggregateNode|
+--------+---------+ +------+------+
| FilterNode | |
+--------+---------+ +-----+----+
| |FilterNode|
+--------+---------+ +-----+----+
+-->+ HashJoinNode +<----+ |
| +------------------+ | +------+-----+
+--------+---------+ +---------+--------+ |HashJoinNode|
| TagNodes | | EdgeNodes | +------+-----+
+------------------+ +------------------+ |
+------+-----+
|MultiTagNode|
+------+-----+
|
+----+---+
|TagNodes|
+--------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*/
StoragePlan<VertexID> plan;
std::vector<TagNode*> tags;
Expand All @@ -217,23 +227,39 @@ StoragePlan<VertexID> GetNeighborsProcessor::buildPlan(RuntimeContext* context,
plan.addNode(std::move(edge));
}

auto hashJoin =
std::make_unique<HashJoinNode>(context, tags, edges, &tagContext_, &edgeContext_, expCtx);
for (auto* tag : tags) {
hashJoin->addDependency(tag);
}
for (auto* edge : edges) {
hashJoin->addDependency(edge);
IterateNode<VertexID>* upstream = nullptr;
IterateNode<VertexID>* join = nullptr;
if (!edges.empty()) {
auto hashJoin =
std::make_unique<HashJoinNode>(context, tags, edges, &tagContext_, &edgeContext_, expCtx);
for (auto* tag : tags) {
hashJoin->addDependency(tag);
}
for (auto* edge : edges) {
hashJoin->addDependency(edge);
}
join = hashJoin.get();
upstream = hashJoin.get();
plan.addNode(std::move(hashJoin));
} else {
context->filterInvalidResultOut = true;
auto groupNode = std::make_unique<MultiTagNode>(context, tags, expCtx);
for (auto* tag : tags) {
groupNode->addDependency(tag);
}
join = groupNode.get();
upstream = groupNode.get();
plan.addNode(std::move(groupNode));
}
IterateNode<VertexID>* join = hashJoin.get();
IterateNode<VertexID>* upstream = hashJoin.get();
plan.addNode(std::move(hashJoin));

if (filter_) {
auto filter =
std::make_unique<FilterNode<VertexID>>(context, upstream, expCtx, filter_->clone());
filter->addDependency(upstream);
upstream = filter.get();
if (edges.empty()) {
filter.get()->setFilterMode(FilterMode::TAG_ONLY);
}
plan.addNode(std::move(filter));
}

Expand Down Expand Up @@ -302,8 +328,7 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::buildTagContext(const cpp2::Trave
// If the list is not given, no prop will be returned.
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
auto returnProps =
(*req.vertex_props_ref()).empty() ? buildAllTagProps() : *req.vertex_props_ref();
auto returnProps = *req.vertex_props_ref();
liuyu85cn marked this conversation as resolved.
Show resolved Hide resolved
auto ret = handleVertexProps(returnProps);

if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -320,8 +345,7 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::buildEdgeContext(const cpp2::Trav
// If the list is not given, no prop will be returned.
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
auto returnProps = (*req.edge_props_ref()).empty() ? buildAllEdgeProps(*req.edge_direction_ref())
: *req.edge_props_ref();
auto returnProps = *req.edge_props_ref();
auto ret = handleEdgeProps(returnProps);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
Expand Down
Loading