diff --git a/src/storage/exec/GetPropNode.h b/src/storage/exec/GetPropNode.h index 5e08c5d881a..7e79d3479df 100644 --- a/src/storage/exec/GetPropNode.h +++ b/src/storage/exec/GetPropNode.h @@ -20,10 +20,15 @@ class GetTagPropNode : public QueryNode { GetTagPropNode(RuntimeContext* context, std::vector tagNodes, nebula::DataSet* resultDataSet, + Expression* filter, std::size_t limit) : context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet), + expCtx_(filter == nullptr + ? nullptr + : new StorageExpressionContext(context->vIdLen(), context->isIntId())), + filter_(filter), limit_(limit) { name_ = "GetTagPropNode"; } @@ -63,19 +68,24 @@ class GetTagPropNode : public QueryNode { auto isIntId = context_->isIntId(); for (auto* tagNode : tagNodes_) { ret = tagNode->collectTagPropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + [&row, tagNode, this](const std::vector* props) -> nebula::cpp2::ErrorCode { for (const auto& prop : *props) { if (prop.returned_) { row.emplace_back(Value()); } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }, - [&row, vIdLen, isIntId]( + [&row, vIdLen, isIntId, tagNode, this]( folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) { + auto status = QueryUtils::collectVertexProps( + key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName()); + if (!status.ok()) { return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; } return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -84,7 +94,12 @@ class GetTagPropNode : public QueryNode { return ret; } } - resultDataSet_->rows.emplace_back(std::move(row)); + if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + if (expCtx_ != nullptr) { + expCtx_->clear(); + } return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -92,6 +107,8 @@ class GetTagPropNode : public QueryNode { RuntimeContext* context_; std::vector tagNodes_; nebula::DataSet* resultDataSet_; + std::unique_ptr expCtx_{nullptr}; + Expression* filter_{nullptr}; const std::size_t limit_{std::numeric_limits::max()}; }; @@ -102,10 +119,15 @@ class GetEdgePropNode : public QueryNode { GetEdgePropNode(RuntimeContext* context, std::vector*> edgeNodes, nebula::DataSet* resultDataSet, + Expression* filter, std::size_t limit) : context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet), + expCtx_(filter == nullptr + ? nullptr + : new StorageExpressionContext(context->vIdLen(), context->isIntId())), + filter_(filter), limit_(limit) { QueryNode::name_ = "GetEdgePropNode"; } @@ -124,20 +146,25 @@ class GetEdgePropNode : public QueryNode { auto isIntId = context_->isIntId(); for (auto* edgeNode : edgeNodes_) { ret = edgeNode->collectEdgePropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + [&row, edgeNode, this](const std::vector* props) -> nebula::cpp2::ErrorCode { for (const auto& prop : *props) { if (prop.returned_) { row.emplace_back(Value()); } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }, - [&row, vIdLen, isIntId]( + [&row, vIdLen, isIntId, edgeNode, this]( folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { - return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + auto status = QueryUtils::collectEdgeProps( + key, vIdLen, isIntId, reader, props, row, expCtx_.get(), edgeNode->getEdgeName()); + if (!status.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; } return nebula::cpp2::ErrorCode::SUCCEEDED; }); @@ -145,7 +172,12 @@ class GetEdgePropNode : public QueryNode { return ret; } } - resultDataSet_->rows.emplace_back(std::move(row)); + if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + if (expCtx_ != nullptr) { + expCtx_->clear(); + } return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -153,6 +185,8 @@ class GetEdgePropNode : public QueryNode { RuntimeContext* context_; std::vector*> edgeNodes_; nebula::DataSet* resultDataSet_; + std::unique_ptr expCtx_{nullptr}; + Expression* filter_{nullptr}; const std::size_t limit_{std::numeric_limits::max()}; }; diff --git a/src/storage/exec/HashJoinNode.h b/src/storage/exec/HashJoinNode.h index 15f512761cc..e12caf5c6de 100644 --- a/src/storage/exec/HashJoinNode.h +++ b/src/storage/exec/HashJoinNode.h @@ -72,19 +72,16 @@ class HashJoinNode : public IterateNode { nebula::List list; list.reserve(props->size()); const auto& tagName = tagNode->getTagName(); - for (const auto& prop : *props) { - VLOG(2) << "Collect prop " << prop.name_; - auto value = QueryUtils::readVertexProp( - key, context_->vIdLen(), context_->isIntId(), reader, prop); - if (!value.ok()) { - return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; - } - if (prop.filtered_ && expCtx_ != nullptr) { - expCtx_->setTagProp(tagName, prop.name_, value.value()); - } - if (prop.returned_) { - list.emplace_back(std::move(value).value()); - } + auto status = QueryUtils::collectVertexProps(key, + context_->vIdLen(), + context_->isIntId(), + reader, + props, + list, + expCtx_, + tagName); + if (!status.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; } result.values.emplace_back(std::move(list)); return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/storage/exec/QueryUtils.h b/src/storage/exec/QueryUtils.h index 948235802b9..ddce512ef42 100644 --- a/src/storage/exec/QueryUtils.h +++ b/src/storage/exec/QueryUtils.h @@ -10,6 +10,7 @@ #include "common/expression/Expression.h" #include "common/utils/DefaultValueContext.h" #include "storage/CommonUtils.h" +#include "storage/context/StorageExpressionContext.h" #include "storage/query/QueryBaseProcessor.h" namespace nebula { @@ -17,6 +18,10 @@ namespace storage { class QueryUtils final { public: + static inline bool vTrue(const Value& v) { + return v.isBool() && v.getBool(); + } + enum class ReturnColType : uint16_t { kVid, kTag, @@ -165,15 +170,21 @@ class QueryUtils final { bool isIntId, RowReader* reader, const std::vector* props, - nebula::List& list) { + nebula::List& list, + StorageExpressionContext* expCtx = nullptr, + const std::string& tagName = "") { for (const auto& prop : *props) { + if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) { + continue; + } + auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); + NG_RETURN_IF_ERROR(value); if (prop.returned_) { VLOG(2) << "Collect prop " << prop.name_; - auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); - if (!value.ok()) { - return value.status(); - } - list.emplace_back(std::move(value).value()); + list.emplace_back(value.value()); + } + if (prop.filtered_ && expCtx != nullptr) { + expCtx->setTagProp(tagName, prop.name_, std::move(value).value()); } } return Status::OK(); @@ -184,15 +195,21 @@ class QueryUtils final { bool isIntId, RowReader* reader, const std::vector* props, - nebula::List& list) { + nebula::List& list, + StorageExpressionContext* expCtx = nullptr, + const std::string& edgeName = "") { for (const auto& prop : *props) { + if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) { + continue; + } + auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); + NG_RETURN_IF_ERROR(value); if (prop.returned_) { VLOG(2) << "Collect prop " << prop.name_; - auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); - if (!value.ok()) { - return value.status(); - } - list.emplace_back(std::move(value).value()); + list.emplace_back(value.value()); + } + if (prop.filtered_ && expCtx != nullptr) { + expCtx->setEdgeProp(edgeName, prop.name_, std::move(value).value()); } } return Status::OK(); diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 7420b50c9d8..c5157d4a462 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -14,10 +14,6 @@ namespace storage { using Cursor = std::string; -inline bool vTrue(const Value& v) { - return v.isBool() && v.getBool(); -} - // Node to scan vertices of one partition class ScanVertexPropNode : public QueryNode { public: @@ -154,7 +150,7 @@ class ScanVertexPropNode : public QueryNode { } } if (ret == nebula::cpp2::ErrorCode::SUCCEEDED && - (filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) { + (filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) { resultDataSet_->rows.emplace_back(std::move(row)); } expCtx_->clear(); @@ -295,7 +291,7 @@ class ScanEdgePropNode : public QueryNode { } } if (ret == nebula::cpp2::ErrorCode::SUCCEEDED && - (filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) { + (filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) { resultDataSet_->rows.emplace_back(std::move(row)); } expCtx_->clear(); diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index 6cf82ef9982..046f25052fe 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -202,7 +202,7 @@ StoragePlan GetPropProcessor::buildTagPlan(RuntimeContext* context, tags.emplace_back(tag.get()); plan.addNode(std::move(tag)); } - auto output = std::make_unique(context, tags, result, limit_); + auto output = std::make_unique(context, tags, result, filter_, limit_); for (auto* tag : tags) { output->addDependency(tag); } @@ -219,7 +219,7 @@ StoragePlan GetPropProcessor::buildEdgePlan(RuntimeContext* conte edges.emplace_back(edge.get()); plan.addNode(std::move(edge)); } - auto output = std::make_unique(context, edges, result, limit_); + auto output = std::make_unique(context, edges, result, filter_, limit_); for (auto* edge : edges) { output->addDependency(edge); } @@ -251,14 +251,28 @@ nebula::cpp2::ErrorCode GetPropProcessor::checkAndBuildContexts(const cpp2::GetP if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - return buildTagContext(req); + code = buildTagContext(req); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } } else { code = getSpaceEdgeSchema(); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - return buildEdgeContext(req); + code = buildEdgeContext(req); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + return code; + } } + code = buildFilter(req, [](const cpp2::GetPropRequest& r) -> const std::string* { + if (r.filter_ref().has_value()) { + return r.get_filter(); + } else { + return nullptr; + } + }); + return code; } nebula::cpp2::ErrorCode GetPropProcessor::buildTagContext(const cpp2::GetPropRequest& req) { diff --git a/src/storage/test/GetPropTest.cpp b/src/storage/test/GetPropTest.cpp index f4cadb2d370..06bf8f5a0bd 100644 --- a/src/storage/test/GetPropTest.cpp +++ b/src/storage/test/GetPropTest.cpp @@ -17,10 +17,14 @@ cpp2::GetPropRequest buildVertexRequest( int32_t totalParts, const std::vector& vertices, const std::vector>>& tags, + Expression* filter = nullptr, int64_t limit = -1) { std::hash hash; cpp2::GetPropRequest req; req.space_id_ref() = 1; + if (filter != nullptr) { + req.filter_ref() = Expression::encode(*filter); + } req.limit_ref() = limit; for (const auto& vertex : vertices) { PartitionID partId = (hash(vertex) % totalParts) + 1; @@ -51,9 +55,13 @@ cpp2::GetPropRequest buildEdgeRequest( int32_t totalParts, const std::vector& edgeKeys, const std::vector>>& edges, + Expression* filter = nullptr, int64_t limit = -1) { cpp2::GetPropRequest req; req.space_id_ref() = 1; + if (filter != nullptr) { + req.filter_ref() = Expression::encode(*filter); + } req.limit_ref() = limit; for (const auto& edge : edgeKeys) { PartitionID partId = (std::hash()(edge.get_src()) % totalParts) + 1; @@ -707,6 +715,95 @@ TEST(QueryVertexPropsTest, PrefixBloomFilterTest) { FLAGS_enable_rocksdb_prefix_filtering = false; } +TEST(GetPropTest, FilterTest) { + fs::TempDir rootPath("/tmp/GetPropTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + EdgeType serve = 101; + + ObjectPool pool; + + // vertex + { + LOG(INFO) << "GetVertexPropInValue"; + std::vector vertices = {"Tim Duncan", "Tony Parker"}; + std::vector>> tags; + tags.emplace_back(player, std::vector{"name", "age", "avgScore"}); + // 1.age == 44 + Expression* filter = RelationalExpression::makeEQ( + &pool, + TagPropertyExpression::make(&pool, std::to_string(player), "age"), + ConstantExpression::make(&pool, 44)); + auto req = buildVertexRequest(totalParts, vertices, tags, filter); + + auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size()); + nebula::DataSet expected; + expected.colNames = {kVid, "1.name", "1.age", "1.avgScore"}; + expected.emplace_back(Row({"Tim Duncan", "Tim Duncan", 44, 19.0})); + // Filtered + // expected.emplace_back(Row({"Tony Parker", "Tony Parker", 38, 15.5})); + ASSERT_EQ(expected, *resp.props_ref()); + } + // edge + { + std::vector edgeKeys; + { + cpp2::EdgeKey edgeKey; + edgeKey.src_ref() = "Tim Duncan"; + edgeKey.edge_type_ref() = 101; + edgeKey.ranking_ref() = 1997; + edgeKey.dst_ref() = "Spurs"; + edgeKeys.emplace_back(std::move(edgeKey)); + } + { + cpp2::EdgeKey edgeKey; + edgeKey.src_ref() = "Tony Parker"; + edgeKey.edge_type_ref() = 101; + edgeKey.ranking_ref() = 2001; + edgeKey.dst_ref() = "Spurs"; + edgeKeys.emplace_back(std::move(edgeKey)); + } + std::vector>> edges; + edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); + // Filter 101.startYear == 2001 + Expression* filter = RelationalExpression::makeEQ( + &pool, + EdgePropertyExpression::make(&pool, std::to_string(serve), "startYear"), + ConstantExpression::make(&pool, 2001)); + auto req = buildEdgeRequest(totalParts, edgeKeys, edges, filter); + + auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size()); + nebula::DataSet expected; + expected.colNames = {"101.teamName", "101.startYear", "101.endYear"}; + { + nebula::Row row({"Spurs", 2001, 2018}); + expected.rows.emplace_back(std::move(row)); + } + // Filtered + // { + // nebula::Row row({"Spurs", 1997, 2016}); + // expected.rows.emplace_back(std::move(row)); + // } + ASSERT_EQ(expected, *resp.props_ref()); + } +} + TEST(GetPropTest, LimitTest) { fs::TempDir rootPath("/tmp/GetPropTest.XXXXXX"); mock::MockCluster cluster; @@ -724,7 +821,7 @@ TEST(GetPropTest, LimitTest) { std::vector vertices = {"Tim Duncan"}; std::vector>> tags; tags.emplace_back(player, std::vector{"name", "age", "avgScore"}); - auto req = buildVertexRequest(totalParts, vertices, tags, 0); + auto req = buildVertexRequest(totalParts, vertices, tags, nullptr, 0); auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); auto fut = processor->getFuture(); @@ -740,7 +837,7 @@ TEST(GetPropTest, LimitTest) { std::vector vertices = {"Tim Duncan", "Tony Parker"}; std::vector>> tags; tags.emplace_back(player, std::vector{"name", "age", "avgScore"}); - auto req = buildVertexRequest(totalParts, vertices, tags, 1); + auto req = buildVertexRequest(totalParts, vertices, tags, nullptr, 1); auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); auto fut = processor->getFuture(); @@ -754,7 +851,7 @@ TEST(GetPropTest, LimitTest) { std::vector vertices = {"Tim Duncan", "Tony Parker"}; std::vector>> tags; tags.emplace_back(player, std::vector{"name", "age", "avgScore"}); - auto req = buildVertexRequest(totalParts, vertices, tags, 3); + auto req = buildVertexRequest(totalParts, vertices, tags, nullptr, 3); auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); auto fut = processor->getFuture(); @@ -788,7 +885,7 @@ TEST(GetPropTest, LimitTest) { } std::vector>> edges; edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); - auto req = buildEdgeRequest(totalParts, edgeKeys, edges, 0); + auto req = buildEdgeRequest(totalParts, edgeKeys, edges, nullptr, 0); auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); auto fut = processor->getFuture(); @@ -820,7 +917,7 @@ TEST(GetPropTest, LimitTest) { } std::vector>> edges; edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); - auto req = buildEdgeRequest(totalParts, edgeKeys, edges, 1); + auto req = buildEdgeRequest(totalParts, edgeKeys, edges, nullptr, 1); auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); auto fut = processor->getFuture(); @@ -850,7 +947,7 @@ TEST(GetPropTest, LimitTest) { } std::vector>> edges; edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); - auto req = buildEdgeRequest(totalParts, edgeKeys, edges, 3); + auto req = buildEdgeRequest(totalParts, edgeKeys, edges, nullptr, 3); auto* processor = GetPropProcessor::instance(env, nullptr, nullptr); auto fut = processor->getFuture();