From 55524d3eacebe5010a2e23a480e47616cc98e86a Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 28 Jan 2022 16:32:49 +0800 Subject: [PATCH 1/2] Push limit down GetProps. --- src/storage/exec/GetPropNode.h | 88 +++++++++++++++++---- src/storage/exec/QueryUtils.h | 4 + src/storage/exec/ScanNode.h | 8 +- src/storage/query/GetPropProcessor.cpp | 22 +++++- src/storage/test/GetPropTest.cpp | 101 ++++++++++++++++++++++++- 5 files changed, 197 insertions(+), 26 deletions(-) diff --git a/src/storage/exec/GetPropNode.h b/src/storage/exec/GetPropNode.h index 872b1556d12..dd96a24d032 100644 --- a/src/storage/exec/GetPropNode.h +++ b/src/storage/exec/GetPropNode.h @@ -19,8 +19,15 @@ class GetTagPropNode : public QueryNode { GetTagPropNode(RuntimeContext* context, std::vector tagNodes, - nebula::DataSet* resultDataSet) - : context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet) { + nebula::DataSet* resultDataSet, + Expression* filter) + : context_(context), + tagNodes_(std::move(tagNodes)), + resultDataSet_(resultDataSet), + expCtx_(filter == nullptr + ? nullptr + : new StorageExpressionContext(context->vIdLen(), context->isIntId())), + filter_(filter) { name_ = "GetTagPropNode"; } @@ -56,20 +63,36 @@ class GetTagPropNode : public QueryNode { auto isIntId = context_->isIntId(); for (auto* tagNode : tagNodes_) { ret = tagNode->collectTagPropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + [&row, tagNode, this](const std::vector* props) -> nebula::cpp2::ErrorCode { for (const auto& prop : *props) { if (prop.returned_) { row.emplace_back(Value()); } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }, - [&row, vIdLen, isIntId]( + [&row, vIdLen, isIntId, tagNode, this]( folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) { - return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + for (const auto& prop : *props) { + if (!(prop.returned_ || (prop.filtered_ && expCtx_ != nullptr))) { + continue; + } + auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); + if (!value.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + } + if (prop.returned_) { + VLOG(2) << "Collect prop " << prop.name_; + row.emplace_back(value.value()); + } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setTagProp(tagNode->getTagName(), prop.name_, std::move(value).value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }); @@ -77,7 +100,12 @@ class GetTagPropNode : public QueryNode { return ret; } } - resultDataSet_->rows.emplace_back(std::move(row)); + if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + if (expCtx_ != nullptr) { + expCtx_->clear(); + } return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -85,6 +113,8 @@ class GetTagPropNode : public QueryNode { RuntimeContext* context_; std::vector tagNodes_; nebula::DataSet* resultDataSet_; + std::unique_ptr expCtx_{nullptr}; + Expression* filter_{nullptr}; }; class GetEdgePropNode : public QueryNode { @@ -93,8 +123,15 @@ class GetEdgePropNode : public QueryNode { GetEdgePropNode(RuntimeContext* context, std::vector*> edgeNodes, - nebula::DataSet* resultDataSet) - : context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet) { + nebula::DataSet* resultDataSet, + Expression* filter) + : context_(context), + edgeNodes_(std::move(edgeNodes)), + resultDataSet_(resultDataSet), + expCtx_(filter == nullptr + ? nullptr + : new StorageExpressionContext(context->vIdLen(), context->isIntId())), + filter_(filter) { QueryNode::name_ = "GetEdgePropNode"; } @@ -109,20 +146,36 @@ class GetEdgePropNode : public QueryNode { auto isIntId = context_->isIntId(); for (auto* edgeNode : edgeNodes_) { ret = edgeNode->collectEdgePropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + [&row, edgeNode, this](const std::vector* props) -> nebula::cpp2::ErrorCode { for (const auto& prop : *props) { if (prop.returned_) { row.emplace_back(Value()); } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }, - [&row, vIdLen, isIntId]( + [&row, vIdLen, isIntId, edgeNode, this]( folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { - return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + for (const auto& prop : *props) { + if (!(prop.returned_ || (prop.filtered_ && expCtx_ != nullptr))) { + continue; + } + auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); + if (!value.ok()) { + return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + if (prop.returned_) { + VLOG(2) << "Collect prop " << prop.name_; + row.emplace_back(value.value()); + } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, std::move(value).value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }); @@ -130,7 +183,12 @@ class GetEdgePropNode : public QueryNode { return ret; } } - resultDataSet_->rows.emplace_back(std::move(row)); + if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + if (expCtx_ != nullptr) { + expCtx_->clear(); + } return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -138,6 +196,8 @@ class GetEdgePropNode : public QueryNode { RuntimeContext* context_; std::vector*> edgeNodes_; nebula::DataSet* resultDataSet_; + std::unique_ptr expCtx_{nullptr}; + Expression* filter_{nullptr}; }; } // namespace storage diff --git a/src/storage/exec/QueryUtils.h b/src/storage/exec/QueryUtils.h index 948235802b9..b595fea00b5 100644 --- a/src/storage/exec/QueryUtils.h +++ b/src/storage/exec/QueryUtils.h @@ -17,6 +17,10 @@ namespace storage { class QueryUtils final { public: + static inline bool vTrue(const Value& v) { + return v.isBool() && v.getBool(); + } + enum class ReturnColType : uint16_t { kVid, kTag, diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 61e6c94d30d..4bbc772a8cc 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -13,10 +13,6 @@ namespace storage { using Cursor = std::string; -inline bool vTrue(const Value& v) { - return v.isBool() && v.getBool(); -} - // Node to scan vertices of one partition class ScanVertexPropNode : public QueryNode { public: @@ -153,7 +149,7 @@ class ScanVertexPropNode : public QueryNode { } } if (ret == nebula::cpp2::ErrorCode::SUCCEEDED && - (filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) { + (filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) { resultDataSet_->rows.emplace_back(std::move(row)); } expCtx_->clear(); @@ -294,7 +290,7 @@ class ScanEdgePropNode : public QueryNode { } } if (ret == nebula::cpp2::ErrorCode::SUCCEEDED && - (filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) { + (filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) { resultDataSet_->rows.emplace_back(std::move(row)); } expCtx_->clear(); diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index 9617e23e7d9..3c2f70636eb 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -199,7 +199,7 @@ StoragePlan GetPropProcessor::buildTagPlan(RuntimeContext* context, tags.emplace_back(tag.get()); plan.addNode(std::move(tag)); } - auto output = std::make_unique(context, tags, result); + auto output = std::make_unique(context, tags, result, filter_); for (auto* tag : tags) { output->addDependency(tag); } @@ -216,7 +216,7 @@ StoragePlan GetPropProcessor::buildEdgePlan(RuntimeContext* conte edges.emplace_back(edge.get()); plan.addNode(std::move(edge)); } - auto output = std::make_unique(context, edges, result); + auto output = std::make_unique(context, edges, result, filter_); for (auto* edge : edges) { output->addDependency(edge); } @@ -248,14 +248,28 @@ nebula::cpp2::ErrorCode GetPropProcessor::checkAndBuildContexts(const cpp2::GetP if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - return buildTagContext(req); + code = buildTagContext(req); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } } else { code = getSpaceEdgeSchema(); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - return buildEdgeContext(req); + code = buildEdgeContext(req); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } } + code = buildFilter(req, [](const cpp2::GetPropRequest& r) -> const std::string* { + if (r.filter_ref().has_value()) { + return r.get_filter(); + } else { + return nullptr; + } + }); + return code; } nebula::cpp2::ErrorCode GetPropProcessor::buildTagContext(const cpp2::GetPropRequest& req) { diff --git a/src/storage/test/GetPropTest.cpp b/src/storage/test/GetPropTest.cpp index eb9e3a600dc..1033c44cdbb 100644 --- a/src/storage/test/GetPropTest.cpp +++ b/src/storage/test/GetPropTest.cpp @@ -16,10 +16,14 @@ namespace storage { cpp2::GetPropRequest buildVertexRequest( int32_t totalParts, const std::vector& vertices, - const std::vector>>& tags) { + const std::vector>>& tags, + Expression* filter = nullptr) { std::hash hash; cpp2::GetPropRequest req; req.space_id_ref() = 1; + if (filter != nullptr) { + req.filter_ref() = Expression::encode(*filter); + } for (const auto& vertex : vertices) { PartitionID partId = (hash(vertex) % totalParts) + 1; nebula::Row row; @@ -48,9 +52,13 @@ cpp2::GetPropRequest buildVertexRequest( cpp2::GetPropRequest buildEdgeRequest( int32_t totalParts, const std::vector& edgeKeys, - const std::vector>>& edges) { + const std::vector>>& edges, + Expression* filter = nullptr) { cpp2::GetPropRequest req; req.space_id_ref() = 1; + if (filter != nullptr) { + req.filter_ref() = Expression::encode(*filter); + } for (const auto& edge : edgeKeys) { PartitionID partId = (std::hash()(edge.get_src()) % totalParts) + 1; nebula::Row row; @@ -703,6 +711,95 @@ TEST(QueryVertexPropsTest, PrefixBloomFilterTest) { FLAGS_enable_rocksdb_prefix_filtering = false; } +TEST(GetPropTest, FilterTest) { + fs::TempDir rootPath("/tmp/GetPropTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + EdgeType serve = 101; + + ObjectPool pool; + + // vertex + { + LOG(INFO) << "GetVertexPropInValue"; + std::vector vertices = {"Tim Duncan", "Tony Parker"}; + std::vector>> tags; + tags.emplace_back(player, std::vector{"name", "age", "avgScore"}); + // 1.age == 44 + Expression* filter = RelationalExpression::makeEQ( + &pool, + TagPropertyExpression::make(&pool, std::to_string(player), "age"), + ConstantExpression::make(&pool, 44)); + auto req = buildVertexRequest(totalParts, vertices, tags, filter); + + auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size()); + nebula::DataSet expected; + expected.colNames = {kVid, "1.name", "1.age", "1.avgScore"}; + expected.emplace_back(Row({"Tim Duncan", "Tim Duncan", 44, 19.0})); + // Filtered + // expected.emplace_back(Row({"Tony Parker", "Tony Parker", 38, 15.5})); + ASSERT_EQ(expected, *resp.props_ref()); + } + // edge + { + std::vector edgeKeys; + { + cpp2::EdgeKey edgeKey; + edgeKey.src_ref() = "Tim Duncan"; + edgeKey.edge_type_ref() = 101; + edgeKey.ranking_ref() = 1997; + edgeKey.dst_ref() = "Spurs"; + edgeKeys.emplace_back(std::move(edgeKey)); + } + { + cpp2::EdgeKey edgeKey; + edgeKey.src_ref() = "Tony Parker"; + edgeKey.edge_type_ref() = 101; + edgeKey.ranking_ref() = 2001; + edgeKey.dst_ref() = "Spurs"; + edgeKeys.emplace_back(std::move(edgeKey)); + } + std::vector>> edges; + edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); + // Filter 101.startYear == 2001 + Expression* filter = RelationalExpression::makeEQ( + &pool, + EdgePropertyExpression::make(&pool, std::to_string(serve), "startYear"), + ConstantExpression::make(&pool, 2001)); + auto req = buildEdgeRequest(totalParts, edgeKeys, edges, filter); + + auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size()); + nebula::DataSet expected; + expected.colNames = {"101.teamName", "101.startYear", "101.endYear"}; + { + nebula::Row row({"Spurs", 2001, 2018}); + expected.rows.emplace_back(std::move(row)); + } + // Filtered + // { + // nebula::Row row({"Spurs", 1997, 2016}); + // expected.rows.emplace_back(std::move(row)); + // } + ASSERT_EQ(expected, *resp.props_ref()); + } +} + } // namespace storage } // namespace nebula From 5aefca89c7910f890d441dc55febc2efdf89f982 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Wed, 9 Feb 2022 10:50:51 +0800 Subject: [PATCH 2/2] Combine logic to function. --- src/storage/exec/GetPropNode.h | 38 +++++++-------------------------- src/storage/exec/HashJoinNode.h | 23 +++++++++----------- src/storage/exec/QueryUtils.h | 37 +++++++++++++++++++++----------- 3 files changed, 43 insertions(+), 55 deletions(-) diff --git a/src/storage/exec/GetPropNode.h b/src/storage/exec/GetPropNode.h index dd96a24d032..271ebf01e2f 100644 --- a/src/storage/exec/GetPropNode.h +++ b/src/storage/exec/GetPropNode.h @@ -78,21 +78,10 @@ class GetTagPropNode : public QueryNode { folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - for (const auto& prop : *props) { - if (!(prop.returned_ || (prop.filtered_ && expCtx_ != nullptr))) { - continue; - } - auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); - if (!value.ok()) { - return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; - } - if (prop.returned_) { - VLOG(2) << "Collect prop " << prop.name_; - row.emplace_back(value.value()); - } - if (prop.filtered_ && expCtx_ != nullptr) { - expCtx_->setTagProp(tagNode->getTagName(), prop.name_, std::move(value).value()); - } + auto status = QueryUtils::collectVertexProps( + key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName()); + if (!status.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; } return nebula::cpp2::ErrorCode::SUCCEEDED; }); @@ -161,21 +150,10 @@ class GetEdgePropNode : public QueryNode { folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - for (const auto& prop : *props) { - if (!(prop.returned_ || (prop.filtered_ && expCtx_ != nullptr))) { - continue; - } - auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); - if (!value.ok()) { - return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; - } - if (prop.returned_) { - VLOG(2) << "Collect prop " << prop.name_; - row.emplace_back(value.value()); - } - if (prop.filtered_ && expCtx_ != nullptr) { - expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, std::move(value).value()); - } + auto status = QueryUtils::collectEdgeProps( + key, vIdLen, isIntId, reader, props, row, expCtx_.get(), edgeNode->getEdgeName()); + if (!status.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; } return nebula::cpp2::ErrorCode::SUCCEEDED; }); diff --git a/src/storage/exec/HashJoinNode.h b/src/storage/exec/HashJoinNode.h index 15f512761cc..e12caf5c6de 100644 --- a/src/storage/exec/HashJoinNode.h +++ b/src/storage/exec/HashJoinNode.h @@ -72,19 +72,16 @@ class HashJoinNode : public IterateNode { nebula::List list; list.reserve(props->size()); const auto& tagName = tagNode->getTagName(); - for (const auto& prop : *props) { - VLOG(2) << "Collect prop " << prop.name_; - auto value = QueryUtils::readVertexProp( - key, context_->vIdLen(), context_->isIntId(), reader, prop); - if (!value.ok()) { - return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; - } - if (prop.filtered_ && expCtx_ != nullptr) { - expCtx_->setTagProp(tagName, prop.name_, value.value()); - } - if (prop.returned_) { - list.emplace_back(std::move(value).value()); - } + auto status = QueryUtils::collectVertexProps(key, + context_->vIdLen(), + context_->isIntId(), + reader, + props, + list, + expCtx_, + tagName); + if (!status.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; } result.values.emplace_back(std::move(list)); return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/storage/exec/QueryUtils.h b/src/storage/exec/QueryUtils.h index b595fea00b5..ddce512ef42 100644 --- a/src/storage/exec/QueryUtils.h +++ b/src/storage/exec/QueryUtils.h @@ -10,6 +10,7 @@ #include "common/expression/Expression.h" #include "common/utils/DefaultValueContext.h" #include "storage/CommonUtils.h" +#include "storage/context/StorageExpressionContext.h" #include "storage/query/QueryBaseProcessor.h" namespace nebula { @@ -169,15 +170,21 @@ class QueryUtils final { bool isIntId, RowReader* reader, const std::vector* props, - nebula::List& list) { + nebula::List& list, + StorageExpressionContext* expCtx = nullptr, + const std::string& tagName = "") { for (const auto& prop : *props) { + if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) { + continue; + } + auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); + NG_RETURN_IF_ERROR(value); if (prop.returned_) { VLOG(2) << "Collect prop " << prop.name_; - auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); - if (!value.ok()) { - return value.status(); - } - list.emplace_back(std::move(value).value()); + list.emplace_back(value.value()); + } + if (prop.filtered_ && expCtx != nullptr) { + expCtx->setTagProp(tagName, prop.name_, std::move(value).value()); } } return Status::OK(); @@ -188,15 +195,21 @@ class QueryUtils final { bool isIntId, RowReader* reader, const std::vector* props, - nebula::List& list) { + nebula::List& list, + StorageExpressionContext* expCtx = nullptr, + const std::string& edgeName = "") { for (const auto& prop : *props) { + if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) { + continue; + } + auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); + NG_RETURN_IF_ERROR(value); if (prop.returned_) { VLOG(2) << "Collect prop " << prop.name_; - auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); - if (!value.ok()) { - return value.status(); - } - list.emplace_back(std::move(value).value()); + list.emplace_back(value.value()); + } + if (prop.filtered_ && expCtx != nullptr) { + expCtx->setEdgeProp(edgeName, prop.name_, std::move(value).value()); } } return Status::OK();