Skip to content

Commit

Permalink
add GroupTagNode
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Dec 27, 2021
1 parent 6300233 commit 0dc277c
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 65 deletions.
4 changes: 4 additions & 0 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ struct RuntimeContext {
// used for update
bool insert_ = false;

// some times, one line is filter out but still return (has edge)
// and some time, this line is just removed from thre return result
bool filterInvalidResultOut = false;

ResultStatus resultStat_{ResultStatus::NORMAL};
};

Expand Down
48 changes: 36 additions & 12 deletions src/storage/exec/FilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
namespace nebula {
namespace storage {

enum class FilterMode {
TAG_AND_EDGE = 0,
TAG_ONLY = 1,
};

/*
FilterNode will receive the result from upstream, check whether tag data or edge
data could pass the expression filter. FilterNode can only accept one upstream
Expand Down Expand Up @@ -59,27 +64,46 @@ class FilterNode : public IterateNode<T> {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void setFilterMode(FilterMode mode) { mode_ = mode; }

private:
// return true when the value iter points to a value which can filter
bool check() override {
if (filterExp_ != nullptr) {
expCtx_->reset(this->reader(), this->key().str());
// result is false when filter out
auto result = filterExp_->eval(*expCtx_);
// NULL is always false
auto ret = result.toBool();
if (ret.isBool() && ret.getBool()) {
return true;
}
return false;
if (filterExp_ == nullptr) {
return true;
}
switch (mode_) {
case FilterMode::TAG_AND_EDGE:
return checkTagAndEdge();
case FilterMode::TAG_ONLY:
return checkTagOnly();
default:
return checkTagAndEdge();
}
return true;
}

bool checkTagOnly() {
auto result = filterExp_->eval(*expCtx_);
// NULL is always false
auto ret = result.toBool();
return ret.isBool() && ret.getBool();
}

// return true when the value iter points to a value which can filter
bool checkTagAndEdge() {
expCtx_->reset(this->reader(), this->key().str());
// result is false when filter out
auto result = filterExp_->eval(*expCtx_);
// NULL is always false
auto ret = result.toBool();
return ret.isBool() && ret.getBool();
}

private:
RuntimeContext* context_;
StorageExpressionContext* expCtx_;
Expression* filterExp_;
FilterMode mode_{FilterMode::TAG_AND_EDGE};
int32_t callCheck{0};
};

} // namespace storage
Expand Down
10 changes: 9 additions & 1 deletion src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,22 @@ class GetNeighborsNode : public QueryNode<VertexID> {
row[1].setList(agg->mutableResult().moveList());
}

resultDataSet_->rows.emplace_back(std::move(row));
// only set filterInvalidResultOut = true in TagOnly mode
// so if it it an edge, this test is always true
if (!context_->filterInvalidResultOut || context_->resultStat_ == ResultStatus::NORMAL) {
resultDataSet_->rows.emplace_back(std::move(row));
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

protected:
GetNeighborsNode() = default;

virtual nebula::cpp2::ErrorCode iterateEdges(std::vector<Value>& row) {
if (edgeContext_->propContexts_.empty()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
int64_t edgeRowCount = 0;
nebula::List list;
for (; upstream_->valid(); upstream_->next(), ++edgeRowCount) {
Expand Down
117 changes: 117 additions & 0 deletions src/storage/exec/MultiTagNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include "common/base/Base.h"
#include "storage/context/StorageExpressionContext.h"
#include "storage/exec/EdgeNode.h"
#include "storage/exec/StorageIterator.h"
#include "storage/exec/TagNode.h"

namespace nebula {
namespace storage {

// MultiTagNode is a replace ment of HashJoinNode
// in execution of "go over"
// if Graph don't pass any Edge prop
class MultiTagNode : public IterateNode<VertexID> {
public:
using RelNode::doExecute;

MultiTagNode(RuntimeContext* context,
const std::vector<TagNode*>& tagNodes,
StorageExpressionContext* expCtx)
: context_(context), tagNodes_(tagNodes), expCtx_(expCtx) {
IterateNode::name_ = "MultiTagNode";
}

nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}

if (expCtx_ != nullptr) {
expCtx_->clear();
}
result_.setList(nebula::List());
auto& result = result_.mutableList();
if (context_->resultStat_ == ResultStatus::ILLEGAL_DATA) {
return nebula::cpp2::ErrorCode::E_INVALID_DATA;
}

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
[&result](const std::vector<PropContext>*) -> nebula::cpp2::ErrorCode {
result.values.emplace_back(Value());
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[this, &result, tagNode](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
nebula::List list;
list.reserve(props->size());
const auto& tagName = tagNode->getTagName();
for (const auto& prop : *props) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readVertexProp(
key, context_->vIdLen(), context_->isIntId(), reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagName, prop.name_, value.value());
}
if (prop.returned_) {
list.emplace_back(std::move(value).value());
}
}
result.values.emplace_back(std::move(list));
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

bool valid() const override {
// return valid_;
auto ret = tagNodes_.back()->valid();
return ret;
}

void next() override { tagNodes_.back()->next(); }

folly::StringPiece key() const override {
LOG(FATAL) << "not allowed to do this";
return "";
}

folly::StringPiece val() const override {
LOG(FATAL) << "not allowed to do this";
return "";
}

RowReader* reader() const override {
LOG(FATAL) << "not allowed to do this";
return nullptr;
}

private:
RuntimeContext* context_;
std::vector<TagNode*> tagNodes_;
StorageExpressionContext* expCtx_;
};

} // namespace storage
} // namespace nebula
44 changes: 30 additions & 14 deletions src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "storage/exec/FilterNode.h"
#include "storage/exec/GetNeighborsNode.h"
#include "storage/exec/HashJoinNode.h"
#include "storage/exec/MultiTagNode.h"
#include "storage/exec/TagNode.h"

namespace nebula {
Expand Down Expand Up @@ -79,6 +80,7 @@ void GetNeighborsProcessor::runInSingleThread(const cpp2::GetNeighborsRequest& r
auto plan = buildPlan(&contexts_.front(), &expCtxs_.front(), &resultDataSet_, limit, random);
std::unordered_set<PartitionID> failedParts;
for (const auto& partEntry : req.get_parts()) {
contexts_.front().resultStat_ = ResultStatus::NORMAL;
auto partId = partEntry.first;
for (const auto& row : partEntry.second) {
CHECK_GE(row.values.size(), 1);
Expand Down Expand Up @@ -217,23 +219,39 @@ StoragePlan<VertexID> GetNeighborsProcessor::buildPlan(RuntimeContext* context,
plan.addNode(std::move(edge));
}

auto hashJoin =
std::make_unique<HashJoinNode>(context, tags, edges, &tagContext_, &edgeContext_, expCtx);
for (auto* tag : tags) {
hashJoin->addDependency(tag);
}
for (auto* edge : edges) {
hashJoin->addDependency(edge);
IterateNode<VertexID>* upstream = nullptr;
IterateNode<VertexID>* join = nullptr;
if (!edges.empty()) {
auto hashJoin =
std::make_unique<HashJoinNode>(context, tags, edges, &tagContext_, &edgeContext_, expCtx);
for (auto* tag : tags) {
hashJoin->addDependency(tag);
}
for (auto* edge : edges) {
hashJoin->addDependency(edge);
}
join = hashJoin.get();
upstream = hashJoin.get();
plan.addNode(std::move(hashJoin));
} else {
context->filterInvalidResultOut = true;
auto groupNode = std::make_unique<MultiTagNode>(context, tags, expCtx);
for (auto* tag : tags) {
groupNode->addDependency(tag);
}
join = groupNode.get();
upstream = groupNode.get();
plan.addNode(std::move(groupNode));
}
IterateNode<VertexID>* join = hashJoin.get();
IterateNode<VertexID>* upstream = hashJoin.get();
plan.addNode(std::move(hashJoin));

if (filter_) {
auto filter =
std::make_unique<FilterNode<VertexID>>(context, upstream, expCtx, filter_->clone());
filter->addDependency(upstream);
upstream = filter.get();
if (edges.empty()) {
filter.get()->setFilterMode(FilterMode::TAG_ONLY);
}
plan.addNode(std::move(filter));
}

Expand Down Expand Up @@ -302,8 +320,7 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::buildTagContext(const cpp2::Trave
// If the list is not given, no prop will be returned.
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
auto returnProps =
(*req.vertex_props_ref()).empty() ? buildAllTagProps() : *req.vertex_props_ref();
auto returnProps = *req.vertex_props_ref();
auto ret = handleVertexProps(returnProps);

if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -320,8 +337,7 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::buildEdgeContext(const cpp2::Trav
// If the list is not given, no prop will be returned.
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
auto returnProps = (*req.edge_props_ref()).empty() ? buildAllEdgeProps(*req.edge_direction_ref())
: *req.edge_props_ref();
auto returnProps = *req.edge_props_ref();
auto ret = handleEdgeProps(returnProps);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
Expand Down
Loading

0 comments on commit 0dc277c

Please sign in to comment.