Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push filter down GetProps. #3844

Merged
Merged
52 changes: 43 additions & 9 deletions src/storage/exec/GetPropNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ class GetTagPropNode : public QueryNode<VertexID> {
GetTagPropNode(RuntimeContext* context,
std::vector<TagNode*> tagNodes,
nebula::DataSet* resultDataSet,
Expression* filter,
std::size_t limit)
: context_(context),
tagNodes_(std::move(tagNodes)),
resultDataSet_(resultDataSet),
expCtx_(filter == nullptr
? nullptr
: new StorageExpressionContext(context->vIdLen(), context->isIntId())),
filter_(filter),
limit_(limit) {
name_ = "GetTagPropNode";
}
Expand Down Expand Up @@ -63,19 +68,24 @@ class GetTagPropNode : public QueryNode<VertexID> {
auto isIntId = context_->isIntId();
for (auto* tagNode : tagNodes_) {
ret = tagNode->collectTagPropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, tagNode, this](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, tagNode, this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) {
auto status = QueryUtils::collectVertexProps(
key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName());
if (!status.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand All @@ -84,14 +94,21 @@ class GetTagPropNode : public QueryNode<VertexID> {
return ret;
}
}
resultDataSet_->rows.emplace_back(std::move(row));
if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
if (expCtx_ != nullptr) {
expCtx_->clear();
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

private:
RuntimeContext* context_;
std::vector<TagNode*> tagNodes_;
nebula::DataSet* resultDataSet_;
std::unique_ptr<StorageExpressionContext> expCtx_{nullptr};
Expression* filter_{nullptr};
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

Expand All @@ -102,10 +119,15 @@ class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
GetEdgePropNode(RuntimeContext* context,
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes,
nebula::DataSet* resultDataSet,
Expression* filter,
std::size_t limit)
: context_(context),
edgeNodes_(std::move(edgeNodes)),
resultDataSet_(resultDataSet),
expCtx_(filter == nullptr
? nullptr
: new StorageExpressionContext(context->vIdLen(), context->isIntId())),
filter_(filter),
limit_(limit) {
QueryNode::name_ = "GetEdgePropNode";
}
Expand All @@ -124,35 +146,47 @@ class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
auto isIntId = context_->isIntId();
for (auto* edgeNode : edgeNodes_) {
ret = edgeNode->collectEdgePropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, edgeNode, this](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, edgeNode, this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
auto status = QueryUtils::collectEdgeProps(
key, vIdLen, isIntId, reader, props, row, expCtx_.get(), edgeNode->getEdgeName());
if (!status.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
}
resultDataSet_->rows.emplace_back(std::move(row));
if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
if (expCtx_ != nullptr) {
expCtx_->clear();
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

private:
RuntimeContext* context_;
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes_;
nebula::DataSet* resultDataSet_;
std::unique_ptr<StorageExpressionContext> expCtx_{nullptr};
Expression* filter_{nullptr};
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

Expand Down
23 changes: 10 additions & 13 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,16 @@ class HashJoinNode : public IterateNode<VertexID> {
nebula::List list;
list.reserve(props->size());
const auto& tagName = tagNode->getTagName();
for (const auto& prop : *props) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readVertexProp(
key, context_->vIdLen(), context_->isIntId(), reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagName, prop.name_, value.value());
}
if (prop.returned_) {
list.emplace_back(std::move(value).value());
}
auto status = QueryUtils::collectVertexProps(key,
context_->vIdLen(),
context_->isIntId(),
reader,
props,
list,
expCtx_,
tagName);
if (!status.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
result.values.emplace_back(std::move(list));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
41 changes: 29 additions & 12 deletions src/storage/exec/QueryUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
#include "common/expression/Expression.h"
#include "common/utils/DefaultValueContext.h"
#include "storage/CommonUtils.h"
#include "storage/context/StorageExpressionContext.h"
#include "storage/query/QueryBaseProcessor.h"

namespace nebula {
namespace storage {

class QueryUtils final {
public:
static inline bool vTrue(const Value& v) {
return v.isBool() && v.getBool();
}

enum class ReturnColType : uint16_t {
kVid,
kTag,
Expand Down Expand Up @@ -165,15 +170,21 @@ class QueryUtils final {
bool isIntId,
RowReader* reader,
const std::vector<PropContext>* props,
nebula::List& list) {
nebula::List& list,
StorageExpressionContext* expCtx = nullptr,
const std::string& tagName = "") {
for (const auto& prop : *props) {
if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) {
continue;
}
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
NG_RETURN_IF_ERROR(value);
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return value.status();
}
list.emplace_back(std::move(value).value());
list.emplace_back(value.value());
}
if (prop.filtered_ && expCtx != nullptr) {
expCtx->setTagProp(tagName, prop.name_, std::move(value).value());
}
}
return Status::OK();
Expand All @@ -184,15 +195,21 @@ class QueryUtils final {
bool isIntId,
RowReader* reader,
const std::vector<PropContext>* props,
nebula::List& list) {
nebula::List& list,
StorageExpressionContext* expCtx = nullptr,
const std::string& edgeName = "") {
for (const auto& prop : *props) {
if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) {
continue;
}
auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop);
NG_RETURN_IF_ERROR(value);
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return value.status();
}
list.emplace_back(std::move(value).value());
list.emplace_back(value.value());
}
if (prop.filtered_ && expCtx != nullptr) {
expCtx->setEdgeProp(edgeName, prop.name_, std::move(value).value());
}
}
return Status::OK();
Expand Down
8 changes: 2 additions & 6 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ namespace storage {

using Cursor = std::string;

inline bool vTrue(const Value& v) {
return v.isBool() && v.getBool();
}

// Node to scan vertices of one partition
class ScanVertexPropNode : public QueryNode<Cursor> {
public:
Expand Down Expand Up @@ -154,7 +150,7 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
(filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
Expand Down Expand Up @@ -295,7 +291,7 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
(filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
Expand Down
22 changes: 18 additions & 4 deletions src/storage/query/GetPropProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ StoragePlan<VertexID> GetPropProcessor::buildTagPlan(RuntimeContext* context,
tags.emplace_back(tag.get());
plan.addNode(std::move(tag));
}
auto output = std::make_unique<GetTagPropNode>(context, tags, result, limit_);
auto output = std::make_unique<GetTagPropNode>(context, tags, result, filter_, limit_);
for (auto* tag : tags) {
output->addDependency(tag);
}
Expand All @@ -219,7 +219,7 @@ StoragePlan<cpp2::EdgeKey> GetPropProcessor::buildEdgePlan(RuntimeContext* conte
edges.emplace_back(edge.get());
plan.addNode(std::move(edge));
}
auto output = std::make_unique<GetEdgePropNode>(context, edges, result, limit_);
auto output = std::make_unique<GetEdgePropNode>(context, edges, result, filter_, limit_);
for (auto* edge : edges) {
output->addDependency(edge);
}
Expand Down Expand Up @@ -251,14 +251,28 @@ nebula::cpp2::ErrorCode GetPropProcessor::checkAndBuildContexts(const cpp2::GetP
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
return buildTagContext(req);
code = buildTagContext(req);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
} else {
code = getSpaceEdgeSchema();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
return buildEdgeContext(req);
code = buildEdgeContext(req);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
}
code = buildFilter(req, [](const cpp2::GetPropRequest& r) -> const std::string* {
if (r.filter_ref().has_value()) {
return r.get_filter();
} else {
return nullptr;
}
});
return code;
}

nebula::cpp2::ErrorCode GetPropProcessor::buildTagContext(const cpp2::GetPropRequest& req) {
Expand Down
Loading