From c56d7534b1899577dcf215913b7dbaa8ecf0df5f Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Wed, 3 Nov 2021 10:54:35 +0800 Subject: [PATCH 01/10] Scan multiple parts. --- src/clients/storage/GraphStorageClient.cpp | 80 +++++--- src/clients/storage/GraphStorageClient.h | 14 +- src/clients/storage/StorageClientBase-inl.h | 23 +++ src/clients/storage/StorageClientBase.h | 11 +- src/interface/storage.thrift | 58 +++--- src/storage/exec/RelNode.h | 2 + src/storage/exec/ScanNode.h | 205 ++++++++++++++++++++ src/storage/query/ScanEdgeProcessor.cpp | 176 +++++++++++------ src/storage/query/ScanEdgeProcessor.h | 25 ++- src/storage/query/ScanVertexProcessor.cpp | 165 ++++++++++------ src/storage/query/ScanVertexProcessor.h | 25 ++- src/storage/test/ScanEdgeTest.cpp | 20 +- src/storage/test/ScanVertexTest.cpp | 57 ++++-- 13 files changed, 651 insertions(+), 210 deletions(-) create mode 100644 src/storage/exec/ScanNode.h diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/GraphStorageClient.cpp index 8bda62fb181..e4400c0e9fb 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/GraphStorageClient.cpp @@ -560,36 +560,68 @@ StorageRpcRespFuture GraphStorageClient::lookupAndTr }); } -folly::Future> GraphStorageClient::scanEdge( - cpp2::ScanEdgeRequest req, folly::EventBase* evb) { - std::pair request; - auto host = this->getLeader(req.get_space_id(), req.get_part_id()); - if (!host.ok()) { - return folly::makeFuture>(host.status()); +StorageRpcRespFuture GraphStorageClient::scanEdge( + const CommonRequestParam& param, + const cpp2::EdgeProp& edgeProp, + int64_t limit, + const Expression* filter) { + std::unordered_map requests; + auto status = getHostPartsWithCursor(param.space); + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + auto& clusters = status.value(); + for (const auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(param.space); + req.set_parts(std::move(c.second)); + req.set_return_columns(edgeProp); + req.set_limit(limit); + if (filter != nullptr) { + req.set_filter(filter->encode()); + } + req.set_common(param.toReqCommon()); } - request.first = std::move(host).value(); - request.second = std::move(req); - return getResponse(evb, - std::move(request), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); }); + return collectResponse(param.evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); }); } -folly::Future> GraphStorageClient::scanVertex( - cpp2::ScanVertexRequest req, folly::EventBase* evb) { - std::pair request; - auto host = this->getLeader(req.get_space_id(), req.get_part_id()); - if (!host.ok()) { - return folly::makeFuture>(host.status()); +StorageRpcRespFuture GraphStorageClient::scanVertex( + const CommonRequestParam& param, + const cpp2::VertexProp& vertexProp, + int64_t limit, + const Expression* filter) { + std::unordered_map requests; + auto status = getHostPartsWithCursor(param.space); + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + auto& clusters = status.value(); + for (const auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(param.space); + req.set_parts(std::move(c.second)); + req.set_return_columns(vertexProp); + req.set_limit(limit); + if (filter != nullptr) { + req.set_filter(filter->encode()); + } + req.set_common(param.toReqCommon()); } - request.first = std::move(host).value(); - request.second = std::move(req); - return getResponse(evb, - std::move(request), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::ScanVertexRequest& r) { return client->future_scanVertex(r); }); + return collectResponse( + param.evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::ScanVertexRequest& r) { + return client->future_scanVertex(r); + }); } StatusOr> GraphStorageClient::getIdFromRow( diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/GraphStorageClient.h index 80f25ec3bbc..ddb1b5b7d78 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/GraphStorageClient.h @@ -131,11 +131,15 @@ class GraphStorageClient : public StorageClientBase lookupAndTraverse( const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec); - folly::Future> scanEdge(cpp2::ScanEdgeRequest req, - folly::EventBase* evb = nullptr); - - folly::Future> scanVertex(cpp2::ScanVertexRequest req, - folly::EventBase* evb = nullptr); + StorageRpcRespFuture scanEdge(const CommonRequestParam& param, + const cpp2::EdgeProp& vertexProp, + int64_t limit, + const Expression* filter); + + StorageRpcRespFuture scanVertex(const CommonRequestParam& param, + const cpp2::VertexProp& vertexProp, + int64_t limit, + const Expression* filter); private: StatusOr> getIdFromRow(GraphSpaceID space, diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 65eb546b381..e62eb214356 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -330,5 +330,28 @@ StorageClientBase::getHostParts(GraphSpaceID spaceId) const { return hostParts; } +template +StatusOr>> +StorageClientBase::getHostPartsWithCursor(GraphSpaceID spaceId) const { + std::unordered_map> hostParts; + auto status = metaClient_->partsNum(spaceId); + if (!status.ok()) { + return Status::Error("Space not found, spaceid: %d", spaceId); + } + + // TODO support cursor + cpp2::ScanCursor c; + c.set_has_next(false); + auto parts = status.value(); + for (auto partId = 1; partId <= parts; partId++) { + auto leader = getLeader(spaceId, partId); + if (!leader.ok()) { + return leader.status(); + } + hostParts[leader.value()].emplace(partId, c); + } + return hostParts; +} + } // namespace storage } // namespace nebula diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index 2a2538a3a3e..95e4c7574e6 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -167,6 +167,9 @@ class StorageClientBase { std::unordered_map>>> clusterIdsToHosts(GraphSpaceID spaceId, const Container& ids, GetIdFunc f) const; + StatusOr>> + getHostPartsWithCursor(GraphSpaceID spaceId) const; + virtual StatusOr getPartHosts(GraphSpaceID spaceId, PartitionID partId) const { CHECK(metaClient_ != nullptr); return metaClient_->getPartHostsFromCache(spaceId, partId); @@ -209,14 +212,6 @@ class StorageClientBase { return {req.get_part_id()}; } - std::vector getReqPartsId(const cpp2::ScanEdgeRequest& req) const { - return {req.get_part_id()}; - } - - std::vector getReqPartsId(const cpp2::ScanVertexRequest& req) const { - return {req.get_part_id()}; - } - bool isValidHostPtr(const HostAddr* addr) { return addr != nullptr && !addr->host.empty() && addr->port != 0; } diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 2c61e5f3648..fcd275e30a3 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -557,24 +557,29 @@ struct LookupAndTraverseRequest { * End of Index section */ +struct ScanCursor { + 3: bool has_next, + // next start key of scan, only valid when has_next is true + 4: optional binary next_cursor, +} + struct ScanVertexRequest { 1: common.GraphSpaceID space_id, - 2: common.PartitionID part_id, - // start key of this block - 3: optional binary cursor, - 4: VertexProp return_columns, + 2: map (cpp.template = "std::unordered_map") + parts, + 3: VertexProp return_columns, // max row count of tag in this response - 5: i64 limit, + 4: i64 limit, // only return data in time range [start_time, end_time) - 6: optional i64 start_time, - 7: optional i64 end_time, - 8: optional binary filter, + 5: optional i64 start_time, + 6: optional i64 end_time, + 7: optional binary filter, // when storage enable multi versions and only_latest_version is true, only return latest version. // when storage disable multi versions, just use the default value. - 9: bool only_latest_version = false, + 8: bool only_latest_version = false, // if set to false, forbid follower read - 10: bool enable_read_from_follower = true, - 11: optional RequestCommon common, + 9: bool enable_read_from_follower = true, + 10: optional RequestCommon common, } struct ScanVertexResponse { @@ -583,29 +588,27 @@ struct ScanVertexResponse { // Each column represents one property. the column name is in the form of "tag_name.prop_alias" // in the same order which specified in VertexProp in request. 2: common.DataSet vertex_data, - 3: bool has_next, - // next start key of scan, only valid when has_next is true - 4: optional binary next_cursor, + 3: map (cpp.template = "std::unordered_map") + cursors; } struct ScanEdgeRequest { 1: common.GraphSpaceID space_id, - 2: common.PartitionID part_id, - // start key of this block - 3: optional binary cursor, - 4: EdgeProp return_columns, + 2: map (cpp.template = "std::unordered_map") + parts, + 3: EdgeProp return_columns, // max row count of edge in this response - 5: i64 limit, + 4: i64 limit, // only return data in time range [start_time, end_time) - 6: optional i64 start_time, - 7: optional i64 end_time, - 8: optional binary filter, + 5: optional i64 start_time, + 6: optional i64 end_time, + 7: optional binary filter, // when storage enable multi versions and only_latest_version is true, only return latest version. // when storage disable multi versions, just use the default value. - 9: bool only_latest_version = false, + 8: bool only_latest_version = false, // if set to false, forbid follower read - 10: bool enable_read_from_follower = true, - 11: optional RequestCommon common, + 9: bool enable_read_from_follower = true, + 10: optional RequestCommon common, } struct ScanEdgeResponse { @@ -614,9 +617,8 @@ struct ScanEdgeResponse { // Each column represents one property. the column name is in the form of "edge_name.prop_alias" // in the same order which specified in EdgeProp in requesss. 2: common.DataSet edge_data, - 3: bool has_next, - // next start key of scan, only valid when has_next is true - 4: optional binary next_cursor, + 3: map (cpp.template = "std::unordered_map") + cursors; } struct TaskPara { diff --git a/src/storage/exec/RelNode.h b/src/storage/exec/RelNode.h index f493352f23e..e41a4618fae 100644 --- a/src/storage/exec/RelNode.h +++ b/src/storage/exec/RelNode.h @@ -78,6 +78,8 @@ class RelNode { explicit RelNode(const std::string& name) : name_(name) {} + const std::string& name() const { return name_; } + std::string name_ = "RelNode"; std::vector*> dependencies_; bool hasDependents_ = false; diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h new file mode 100644 index 00000000000..058055bc2d0 --- /dev/null +++ b/src/storage/exec/ScanNode.h @@ -0,0 +1,205 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#pragma once + +#include + +#include "common/base/Base.h" +#include "storage/exec/GetPropNode.h" + +namespace nebula { +namespace storage { + +using Cursor = std::string; + +// Node to scan vertices of one partition +class ScanVertexPropNode : public QueryNode { + public: + using RelNode::doExecute; + + explicit ScanVertexPropNode(RuntimeContext* context, + std::vector> tagNodes, + bool enableReadFollower, + int64_t limit, + std::unordered_map* cursors, + nebula::DataSet* resultDataSet) + : context_(context), + tagNodes_(std::move(tagNodes)), + enableReadFollower_(enableReadFollower), + limit_(limit), + cursors_(cursors) { + name_ = "ScanVertexPropNode"; + std::vector tags; + for (const auto& t : tagNodes_) { + tags.emplace_back(t.get()); + } + node_ = std::make_unique(context, tags, resultDataSet); + for (auto* tag : tags) { + node_->addDependency(tag); + } + } + + nebula::cpp2::ErrorCode doExecute(PartitionID partId, const Cursor& cursor) override { + auto ret = RelNode::doExecute(partId); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + + std::string start; + std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + if (cursor.empty()) { + start = prefix; + } else { + start = cursor; + } + + std::unique_ptr iter; + auto kvRet = context_->env()->kvstore_->rangeWithPrefix( + context_->planContext_->spaceId_, partId, start, prefix, &iter, enableReadFollower_); + if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + return kvRet; + } + + const auto rowLimit = limit_; + RowReaderWrapper reader; + std::string currentVertexId; + for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { + auto key = iter->key(); + + auto vertexId = NebulaKeyUtils::getVertexId(context_->vIdLen(), key); + if (vertexId != currentVertexId) { + currentVertexId = vertexId; + } else { + // skip same vertex id tag key + continue; + } + node_->doExecute(partId, vertexId.subpiece(0, vertexId.find_first_of('\0')).toString()); + rowCount++; + } + + cpp2::ScanCursor c; + if (iter->valid()) { + c.set_has_next(true); + c.set_next_cursor(iter->key().str()); + } else { + c.set_has_next(false); + } + cursors_->emplace(partId, std::move(c)); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + private: + RuntimeContext* context_; + std::unique_ptr node_; + std::vector> tagNodes_; + bool enableReadFollower_; + int64_t limit_; + // cursors for next scan + std::unordered_map* cursors_; +}; + +// Node to scan edge of one partition +class ScanEdgePropNode : public QueryNode { + public: + using RelNode::doExecute; + + ScanEdgePropNode(RuntimeContext* context, + std::unordered_set edgeTypes, + std::vector>> edgeNodes, + bool enableReadFollower, + int64_t limit, + std::unordered_map* cursors, + nebula::DataSet* resultDataSet) + : context_(context), + edgeTypes_(std::move(edgeTypes)), + edgeNodes_(std::move(edgeNodes)), + enableReadFollower_(enableReadFollower), + limit_(limit), + cursors_(cursors) { + QueryNode::name_ = "ScanEdgePropNode"; + std::vector*> edges; + for (const auto& e : edgeNodes_) { + edges.emplace_back(e.get()); + } + node_ = std::make_unique(context, edges, resultDataSet); + + for (auto* edge : edges) { + node_->addDependency(edge); + } + } + + nebula::cpp2::ErrorCode doExecute(PartitionID partId, const Cursor& cursor) override { + auto ret = RelNode::doExecute(partId); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + + std::string start; + std::string prefix = NebulaKeyUtils::edgePrefix(partId); + if (cursor.empty()) { + start = prefix; + } else { + start = cursor; + } + + std::unique_ptr iter; + auto kvRet = context_->env()->kvstore_->rangeWithPrefix( + context_->spaceId(), partId, start, prefix, &iter, enableReadFollower_); + if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + return kvRet; + } + + auto rowLimit = limit_; + RowReaderWrapper reader; + + for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { + auto key = iter->key(); + if (!NebulaKeyUtils::isEdge(context_->vIdLen(), key)) { + continue; + } + + auto srcId = NebulaKeyUtils::getSrcId(context_->vIdLen(), key); + auto dstId = NebulaKeyUtils::getDstId(context_->vIdLen(), key); + auto edgeType = NebulaKeyUtils::getEdgeType(context_->vIdLen(), key); + auto ranking = NebulaKeyUtils::getRank(context_->vIdLen(), key); + + if (edgeTypes_.find(edgeType) == edgeTypes_.end()) { + continue; + } + + cpp2::EdgeKey edgeKey; + edgeKey.set_src(srcId.subpiece(0, srcId.find_first_of('\0')).toString()); + edgeKey.set_dst(dstId.subpiece(0, dstId.find_first_of('\0')).toString()); + edgeKey.set_edge_type(edgeType); + edgeKey.set_ranking(ranking); + node_->doExecute(partId, std::move(edgeKey)); + } + + cpp2::ScanCursor c; + if (iter->valid()) { + c.set_has_next(true); + c.set_next_cursor(iter->key().str()); + } else { + c.set_has_next(false); + } + cursors_->emplace(partId, std::move(c)); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + private: + RuntimeContext* context_; + std::unordered_set edgeTypes_; + std::vector>> edgeNodes_; + std::unique_ptr node_; + bool enableReadFollower_; + int64_t limit_; + // cursors for next scan + std::unordered_map* cursors_; +}; + +} // namespace storage +} // namespace nebula diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 09a8d96eae0..fef9c6a6f9e 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -25,81 +25,35 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) { void ScanEdgeProcessor::doProcess(const cpp2::ScanEdgeRequest& req) { spaceId_ = req.get_space_id(); - partId_ = req.get_part_id(); + enableReadFollower_ = req.get_enable_read_from_follower(); + limit_ = req.get_limit(); auto retCode = getSpaceVidLen(spaceId_); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); + for (auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); + retCode = checkAndBuildContexts(req); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); - onFinished(); - return; - } - - std::string start; - std::string prefix = NebulaKeyUtils::edgePrefix(partId_); - if (req.get_cursor() == nullptr || req.get_cursor()->empty()) { - start = prefix; - } else { - start = *req.get_cursor(); - } - - std::unique_ptr iter; - auto kvRet = env_->kvstore_->rangeWithPrefix( - spaceId_, partId_, start, prefix, &iter, req.get_enable_read_from_follower()); - if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleErrorCode(kvRet, spaceId_, partId_); + for (auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } - auto rowLimit = req.get_limit(); - RowReaderWrapper reader; - - for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { - auto key = iter->key(); - if (!NebulaKeyUtils::isEdge(spaceVidLen_, key)) { - continue; - } - - auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, key); - auto edgeIter = edgeContext_.indexMap_.find(edgeType); - if (edgeIter == edgeContext_.indexMap_.end()) { - continue; - } - - auto val = iter->val(); - auto schemaIter = edgeContext_.schemas_.find(std::abs(edgeType)); - CHECK(schemaIter != edgeContext_.schemas_.end()); - reader.reset(schemaIter->second, val); - if (!reader) { - continue; - } - - nebula::List list; - auto idx = edgeIter->second; - auto props = &(edgeContext_.propContexts_[idx].second); - if (!QueryUtils::collectEdgeProps(key, spaceVidLen_, isIntId_, reader.get(), props, list) - .ok()) { - continue; - } - resultDataSet_.rows.emplace_back(std::move(list)); - rowCount++; - } - - if (iter->valid()) { - resp_.set_has_next(true); - resp_.set_next_cursor(iter->key().str()); + if (!FLAGS_query_concurrently) { + runInSingleThread(req); } else { - resp_.set_has_next(false); + runInMultipleThread(req); } - onProcessFinished(); - onFinished(); } nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::ScanEdgeRequest& req) { @@ -124,7 +78,107 @@ void ScanEdgeProcessor::buildEdgeColName(const std::vector& edge } } -void ScanEdgeProcessor::onProcessFinished() { resp_.set_edge_data(std::move(resultDataSet_)); } +void ScanEdgeProcessor::onProcessFinished() { + resp_.set_edge_data(std::move(resultDataSet_)); + resp_.set_cursors(std::move(cursors_)); +} + +StoragePlan ScanEdgeProcessor::buildPlan( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors) { + StoragePlan plan; + std::vector>> edges; + std::unordered_set edgeTypes; + for (const auto& ec : edgeContext_.propContexts_) { + edges.emplace_back( + std::make_unique(context, &edgeContext_, ec.first, &ec.second)); + edgeTypes.emplace(ec.first); + } + auto output = std::make_unique(context, + std::move(edgeTypes), + std::move(edges), + enableReadFollower_, + limit_, + cursors, + result); + + plan.addNode(std::move(output)); + return plan; +} + +folly::Future> ScanEdgeProcessor::runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors, + PartitionID partId, + Cursor cursor) { + return folly::via(executor_, + [this, context, result, cursors, partId, input = std::move(cursor)]() { + auto plan = buildPlan(context, result, cursors); + + auto ret = plan.go(partId, input); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return std::make_pair(ret, partId); + } + return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId); + }); +} + +void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) { + contexts_.emplace_back(RuntimeContext(planContext_.get())); + std::unordered_set failedParts; + auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_); + for (const auto& partEntry : req.get_parts()) { + auto partId = partEntry.first; + auto cursor = partEntry.second; + + auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && + failedParts.find(partId) == failedParts.end()) { + failedParts.emplace(partId); + handleErrorCode(ret, spaceId_, partId); + } + } + onProcessFinished(); + onFinished(); +} + +void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { + cursorsOfPart_.resize(req.get_parts().size()); + for (size_t i = 0; i < req.get_parts().size(); i++) { + nebula::DataSet result = resultDataSet_; + results_.emplace_back(std::move(result)); + contexts_.emplace_back(RuntimeContext(planContext_.get())); + } + size_t i = 0; + std::vector>> futures; + for (const auto& [partId, cursor] : req.get_parts()) { + futures.emplace_back(runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.get_has_next() ? *cursor.get_next_cursor() : "")); + i++; + } + + folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + CHECK(!t.hasException()); + const auto& tries = t.value(); + for (size_t j = 0; j < tries.size(); j++) { + CHECK(!tries[j].hasException()); + const auto& [code, partId] = tries[j].value(); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleErrorCode(code, spaceId_, partId); + } else { + resultDataSet_.append(std::move(results_[j])); + cursors_.merge(std::move(cursorsOfPart_[j])); + } + } + this->onProcessFinished(); + this->onFinished(); + }); +} } // namespace storage } // namespace nebula diff --git a/src/storage/query/ScanEdgeProcessor.h b/src/storage/query/ScanEdgeProcessor.h index 57b02776811..9e72b7d5b6b 100644 --- a/src/storage/query/ScanEdgeProcessor.h +++ b/src/storage/query/ScanEdgeProcessor.h @@ -8,6 +8,8 @@ #define STORAGE_QUERY_SCANEDGEPROCESSOR_H_ #include "common/base/Base.h" +#include "storage/exec/ScanNode.h" +#include "storage/exec/StoragePlan.h" #include "storage/query/QueryBaseProcessor.h" namespace nebula { @@ -36,9 +38,30 @@ class ScanEdgeProcessor : public QueryBaseProcessor& edgeProps); + StoragePlan buildPlan(RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors); + + folly::Future> runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors, + PartitionID partId, + Cursor cursor); + + void runInSingleThread(const cpp2::ScanEdgeRequest& req); + + void runInMultipleThread(const cpp2::ScanEdgeRequest& req); + void onProcessFinished() override; - PartitionID partId_; + std::vector contexts_; + std::vector results_; + std::vector> cursorsOfPart_; + + std::unordered_map cursors_; + int64_t limit_{-1}; + bool enableReadFollower_{false}; }; } // namespace storage diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 22c14ef92bf..b2b732c2272 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -25,77 +25,35 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) { void ScanVertexProcessor::doProcess(const cpp2::ScanVertexRequest& req) { spaceId_ = req.get_space_id(); - partId_ = req.get_part_id(); + limit_ = req.get_limit(); + enableReadFollower_ = req.get_enable_read_from_follower(); auto retCode = getSpaceVidLen(spaceId_); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); + for (const auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); + retCode = checkAndBuildContexts(req); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); - onFinished(); - return; - } - - std::string start; - std::string prefix = NebulaKeyUtils::vertexPrefix(partId_); - if (req.get_cursor() == nullptr || req.get_cursor()->empty()) { - start = prefix; - } else { - start = *req.get_cursor(); - } - - std::unique_ptr iter; - auto kvRet = env_->kvstore_->rangeWithPrefix( - spaceId_, partId_, start, prefix, &iter, req.get_enable_read_from_follower()); - if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleErrorCode(kvRet, spaceId_, partId_); + for (const auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } - auto rowLimit = req.get_limit(); - RowReaderWrapper reader; - for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { - auto key = iter->key(); - - auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key); - auto tagIter = tagContext_.indexMap_.find(tagId); - if (tagIter == tagContext_.indexMap_.end()) { - continue; - } - - auto val = iter->val(); - auto schemaIter = tagContext_.schemas_.find(tagId); - CHECK(schemaIter != tagContext_.schemas_.end()); - reader.reset(schemaIter->second, val); - if (!reader) { - continue; - } - - nebula::List list; - auto idx = tagIter->second; - auto props = &(tagContext_.propContexts_[idx].second); - if (!QueryUtils::collectVertexProps(key, spaceVidLen_, isIntId_, reader.get(), props, list) - .ok()) { - continue; - } - resultDataSet_.rows.emplace_back(std::move(list)); - rowCount++; - } - - if (iter->valid()) { - resp_.set_has_next(true); - resp_.set_next_cursor(iter->key().str()); + if (!FLAGS_query_concurrently) { + runInSingleThread(req); } else { - resp_.set_has_next(false); + runInMultipleThread(req); } - onProcessFinished(); - onFinished(); } nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( @@ -112,6 +70,7 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( } void ScanVertexProcessor::buildTagColName(const std::vector& tagProps) { + resultDataSet_.colNames.emplace_back(kVid); for (const auto& tagProp : tagProps) { auto tagId = tagProp.get_tag(); auto tagName = tagContext_.tagNames_[tagId]; @@ -121,7 +80,99 @@ void ScanVertexProcessor::buildTagColName(const std::vector& t } } -void ScanVertexProcessor::onProcessFinished() { resp_.set_vertex_data(std::move(resultDataSet_)); } +void ScanVertexProcessor::onProcessFinished() { + resp_.set_vertex_data(std::move(resultDataSet_)); + resp_.set_cursors(std::move(cursors_)); +} + +StoragePlan ScanVertexProcessor::buildPlan( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors) { + StoragePlan plan; + std::vector> tags; + for (const auto& tc : tagContext_.propContexts_) { + tags.emplace_back(std::make_unique(context, &tagContext_, tc.first, &tc.second)); + } + auto output = std::make_unique( + context, std::move(tags), enableReadFollower_, limit_, cursors, result); + + plan.addNode(std::move(output)); + return plan; +} + +folly::Future> ScanVertexProcessor::runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursorsOfPart, + PartitionID partId, + Cursor cursor) { + return folly::via(executor_, + [this, context, result, cursorsOfPart, partId, input = std::move(cursor)]() { + auto plan = buildPlan(context, result, cursorsOfPart); + + auto ret = plan.go(partId, input); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return std::make_pair(ret, partId); + } + return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId); + }); +} + +void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req) { + contexts_.emplace_back(RuntimeContext(planContext_.get())); + std::unordered_set failedParts; + auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_); + for (const auto& partEntry : req.get_parts()) { + auto partId = partEntry.first; + auto cursor = partEntry.second; + + auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && + failedParts.find(partId) == failedParts.end()) { + failedParts.emplace(partId); + handleErrorCode(ret, spaceId_, partId); + } + } + onProcessFinished(); + onFinished(); +} + +void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req) { + cursorsOfPart_.resize(req.get_parts().size()); + for (size_t i = 0; i < req.get_parts().size(); i++) { + nebula::DataSet result = resultDataSet_; + results_.emplace_back(std::move(result)); + contexts_.emplace_back(RuntimeContext(planContext_.get())); + } + size_t i = 0; + std::vector>> futures; + for (const auto& [partId, cursor] : req.get_parts()) { + futures.emplace_back(runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.get_has_next() ? *cursor.get_next_cursor() : "")); + i++; + } + + folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + CHECK(!t.hasException()); + const auto& tries = t.value(); + for (size_t j = 0; j < tries.size(); j++) { + CHECK(!tries[j].hasException()); + const auto& [code, partId] = tries[j].value(); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleErrorCode(code, spaceId_, partId); + } else { + resultDataSet_.append(std::move(results_[j])); + cursors_.merge(std::move(cursorsOfPart_[j])); + } + } + this->onProcessFinished(); + this->onFinished(); + }); +} } // namespace storage } // namespace nebula diff --git a/src/storage/query/ScanVertexProcessor.h b/src/storage/query/ScanVertexProcessor.h index fc075f3c33c..9a42b5ea59d 100644 --- a/src/storage/query/ScanVertexProcessor.h +++ b/src/storage/query/ScanVertexProcessor.h @@ -8,6 +8,8 @@ #define STORAGE_QUERY_SCANVERTEXPROCESSOR_H_ #include "common/base/Base.h" +#include "storage/exec/ScanNode.h" +#include "storage/exec/StoragePlan.h" #include "storage/query/QueryBaseProcessor.h" namespace nebula { @@ -37,10 +39,31 @@ class ScanVertexProcessor void buildTagColName(const std::vector& tagProps); + StoragePlan buildPlan(RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors); + + folly::Future> runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors, + PartitionID partId, + Cursor cursor); + + void runInSingleThread(const cpp2::ScanVertexRequest& req); + + void runInMultipleThread(const cpp2::ScanVertexRequest& req); + void onProcessFinished() override; private: - PartitionID partId_; + std::vector contexts_; + std::vector results_; + std::vector> cursorsOfPart_; + + std::unordered_map cursors_; + int64_t limit_{-1}; + bool enableReadFollower_{false}; }; } // namespace storage diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 0ed763a152e..38a042777fd 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -23,8 +23,12 @@ cpp2::ScanEdgeRequest buildRequest(PartitionID partId, bool onlyLatestVer = false) { cpp2::ScanEdgeRequest req; req.set_space_id(1); - req.set_part_id(partId); - req.set_cursor(cursor); + cpp2::ScanCursor c; + c.set_has_next(true); + c.set_next_cursor(cursor); + std::unordered_map parts; + parts.emplace(partId, std::move(c)); + req.set_parts(std::move(parts)); EdgeType edgeType = edge.first; cpp2::EdgeProp edgeProp; edgeProp.set_type(edgeType); @@ -156,10 +160,10 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref().has_value()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } @@ -183,10 +187,10 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref().has_value()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 729b6ab0f7b..5ccc2542542 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -23,8 +23,12 @@ cpp2::ScanVertexRequest buildRequest(PartitionID partId, bool onlyLatestVer = false) { cpp2::ScanVertexRequest req; req.set_space_id(1); - req.set_part_id(partId); - req.set_cursor(cursor); + cpp2::ScanCursor c; + c.set_has_next(true); + c.set_next_cursor(cursor); + std::unordered_map parts; + parts.emplace(partId, std::move(c)); + req.set_parts(std::move(parts)); TagID tagId = tag.first; cpp2::VertexProp vertexProp; vertexProp.set_tag(tagId); @@ -45,9 +49,10 @@ void checkResponse(const nebula::DataSet& dataSet, size_t& totalRowCount) { ASSERT_EQ(dataSet.colNames.size(), expectColumnCount); if (!tag.second.empty()) { - ASSERT_EQ(dataSet.colNames.size(), tag.second.size()); - for (size_t i = 0; i < dataSet.colNames.size(); i++) { - ASSERT_EQ(dataSet.colNames[i], std::to_string(tag.first) + "." + tag.second[i]); + ASSERT_EQ(dataSet.colNames.size(), tag.second.size() + 1 /* kVid*/); + for (size_t i = 0; i < dataSet.colNames.size() - 1 /* kVid */; i++) { + ASSERT_EQ(dataSet.colNames[i + 1 /* kVid */], + std::to_string(tag.first) + "." + tag.second[i]); } } totalRowCount += dataSet.rows.size(); @@ -64,13 +69,17 @@ void checkResponse(const nebula::DataSet& dataSet, mock::MockData::players_.end(), [&](const auto& player) { return player.name_ == vId; }); CHECK(iter != mock::MockData::players_.end()); - QueryTestUtils::checkPlayer(props, *iter, row.values); + std::vector returnProps({kVid}); + returnProps.insert(returnProps.end(), props.begin(), props.end()); + QueryTestUtils::checkPlayer(returnProps, *iter, row.values); break; } case 2: { // tag team auto iter = std::find(mock::MockData::teams_.begin(), mock::MockData::teams_.end(), vId); - QueryTestUtils::checkTeam(props, *iter, row.values); + std::vector returnProps({kVid}); + returnProps.insert(returnProps.end(), props.begin(), props.end()); + QueryTestUtils::checkTeam(returnProps, *iter, row.values); break; } default: @@ -103,7 +112,7 @@ TEST(ScanVertexTest, PropertyTest) { auto resp = std::move(f).get(); ASSERT_EQ(0, resp.result.failed_parts.size()); - checkResponse(*resp.vertex_data_ref(), tag, tag.second.size(), totalRowCount); + checkResponse(*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); } CHECK_EQ(mock::MockData::players_.size(), totalRowCount); } @@ -111,6 +120,18 @@ TEST(ScanVertexTest, PropertyTest) { LOG(INFO) << "Scan one tag with all properties in one batch"; size_t totalRowCount = 0; auto tag = std::make_pair(player, std::vector{}); + auto respTag = std::make_pair(player, + std::vector{"name", + "age", + "playing", + "career", + "startYear", + "endYear", + "games", + "avgScore", + "serveTeams", + "country", + "champions"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { auto req = buildRequest(partId, "", tag); auto* processor = ScanVertexProcessor::instance(env, nullptr); @@ -120,7 +141,7 @@ TEST(ScanVertexTest, PropertyTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); // all 11 columns in value - checkResponse(*resp.vertex_data_ref(), tag, 11, totalRowCount); + checkResponse(*resp.vertex_data_ref(), respTag, 11 + 1 /* kVid */, totalRowCount); } CHECK_EQ(mock::MockData::players_.size(), totalRowCount); } @@ -153,11 +174,12 @@ TEST(ScanVertexTest, CursorTest) { auto resp = std::move(f).get(); ASSERT_EQ(0, resp.result.failed_parts.size()); - checkResponse(*resp.vertex_data_ref(), tag, tag.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + checkResponse( + *resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } @@ -179,11 +201,12 @@ TEST(ScanVertexTest, CursorTest) { auto resp = std::move(f).get(); ASSERT_EQ(0, resp.result.failed_parts.size()); - checkResponse(*resp.vertex_data_ref(), tag, tag.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + checkResponse( + *resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } From dd0790e110ead33a2fcd5d812a84ebf947a7c523 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Wed, 3 Nov 2021 11:20:56 +0800 Subject: [PATCH 02/10] Add multiple parts test case. --- src/storage/test/ScanEdgeTest.cpp | 63 ++++++++++++++++++++---- src/storage/test/ScanVertexTest.cpp | 75 +++++++++++++++++++++++++---- 2 files changed, 120 insertions(+), 18 deletions(-) diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 38a042777fd..8678f0daad4 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -14,8 +14,8 @@ namespace nebula { namespace storage { -cpp2::ScanEdgeRequest buildRequest(PartitionID partId, - const std::string& cursor, +cpp2::ScanEdgeRequest buildRequest(std::vector partIds, + std::vector cursors, const std::pair>& edge, int64_t rowLimit = 100, int64_t startTime = 0, @@ -24,10 +24,13 @@ cpp2::ScanEdgeRequest buildRequest(PartitionID partId, cpp2::ScanEdgeRequest req; req.set_space_id(1); cpp2::ScanCursor c; - c.set_has_next(true); - c.set_next_cursor(cursor); + CHECK_EQ(partIds.size(), cursors.size()); std::unordered_map parts; - parts.emplace(partId, std::move(c)); + for (std::size_t i = 0; i < partIds.size(); ++i) { + c.set_has_next(!cursors[i].empty()); + c.set_next_cursor(cursors[i]); + parts.emplace(partIds[i], c); + } req.set_parts(std::move(parts)); EdgeType edgeType = edge.first; cpp2::EdgeProp edgeProp; @@ -101,7 +104,7 @@ TEST(ScanEdgeTest, PropertyTest) { serve, std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", edge); + auto req = buildRequest({partId}, {""}, edge); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -117,7 +120,7 @@ TEST(ScanEdgeTest, PropertyTest) { size_t totalRowCount = 0; auto edge = std::make_pair(serve, std::vector{}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", edge); + auto req = buildRequest({partId}, {""}, edge); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -152,7 +155,7 @@ TEST(ScanEdgeTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, edge, 5); + auto req = buildRequest({partId}, {cursor}, edge, 5); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -179,7 +182,7 @@ TEST(ScanEdgeTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, edge, 1); + auto req = buildRequest({partId}, {cursor}, edge, 1); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -198,6 +201,48 @@ TEST(ScanEdgeTest, CursorTest) { } } +TEST(ScanEdgeTest, MultiplePartsTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + EdgeType serve = 101; + + { + LOG(INFO) << "Scan one edge with some properties in one batch"; + size_t totalRowCount = 0; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + auto req = buildRequest({1, 3}, {"", ""}, edge); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); + } + { + LOG(INFO) << "Scan one edge with all properties in one batch"; + size_t totalRowCount = 0; + auto edge = std::make_pair(serve, std::vector{}); + auto req = buildRequest({1, 3}, {"", ""}, edge); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 9 columns in value + checkResponse(*resp.edge_data_ref(), edge, 9, totalRowCount); + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 5ccc2542542..8a163aecb69 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -4,6 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ +#include #include #include "common/base/Base.h" @@ -14,8 +15,8 @@ namespace nebula { namespace storage { -cpp2::ScanVertexRequest buildRequest(PartitionID partId, - const std::string& cursor, +cpp2::ScanVertexRequest buildRequest(std::vector partIds, + std::vector cursors, const std::pair>& tag, int64_t rowLimit = 100, int64_t startTime = 0, @@ -24,10 +25,13 @@ cpp2::ScanVertexRequest buildRequest(PartitionID partId, cpp2::ScanVertexRequest req; req.set_space_id(1); cpp2::ScanCursor c; - c.set_has_next(true); - c.set_next_cursor(cursor); + CHECK_EQ(partIds.size(), cursors.size()); std::unordered_map parts; - parts.emplace(partId, std::move(c)); + for (std::size_t i = 0; i < partIds.size(); ++i) { + c.set_has_next(!cursors[i].empty()); + c.set_next_cursor(cursors[i]); + parts.emplace(partIds[i], c); + } req.set_parts(std::move(parts)); TagID tagId = tag.first; cpp2::VertexProp vertexProp; @@ -105,7 +109,7 @@ TEST(ScanVertexTest, PropertyTest) { auto tag = std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", tag); + auto req = buildRequest({partId}, {""}, tag); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -133,7 +137,7 @@ TEST(ScanVertexTest, PropertyTest) { "country", "champions"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", tag); + auto req = buildRequest({partId}, {""}, tag); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -167,7 +171,7 @@ TEST(ScanVertexTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, tag, 5); + auto req = buildRequest({partId}, {cursor}, tag, 5); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -194,7 +198,7 @@ TEST(ScanVertexTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, tag, 1); + auto req = buildRequest({partId}, {cursor}, tag, 1); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -214,6 +218,59 @@ TEST(ScanVertexTest, CursorTest) { } } +TEST(ScanVertexTest, MultiplePartsTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + size_t totalRowCount = 0; + auto tag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto req = buildRequest({1, 3}, {"", ""}, tag); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + } + { + LOG(INFO) << "Scan one tag with all properties in one batch"; + size_t totalRowCount = 0; + auto tag = std::make_pair(player, std::vector{}); + auto respTag = std::make_pair(player, + std::vector{"name", + "age", + "playing", + "career", + "startYear", + "endYear", + "games", + "avgScore", + "serveTeams", + "country", + "champions"}); + auto req = buildRequest({1, 3}, {"", ""}, tag); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 11 columns in value + checkResponse(*resp.vertex_data_ref(), respTag, 11 + 1 /* kVid */, totalRowCount); + } +} + } // namespace storage } // namespace nebula From f126d77c2d053963fc0a32f684618fb542777278 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Wed, 3 Nov 2021 14:51:15 +0800 Subject: [PATCH 03/10] Add limit test. --- src/storage/exec/ScanNode.h | 15 +++++--- src/storage/test/ScanEdgeTest.cpp | 47 ++++++++++++++++++++++++ src/storage/test/ScanVertexTest.cpp | 57 +++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 5 deletions(-) diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 058055bc2d0..e73014115eb 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -31,7 +31,8 @@ class ScanVertexPropNode : public QueryNode { tagNodes_(std::move(tagNodes)), enableReadFollower_(enableReadFollower), limit_(limit), - cursors_(cursors) { + cursors_(cursors), + resultDataSet_(resultDataSet) { name_ = "ScanVertexPropNode"; std::vector tags; for (const auto& t : tagNodes_) { @@ -67,7 +68,8 @@ class ScanVertexPropNode : public QueryNode { const auto rowLimit = limit_; RowReaderWrapper reader; std::string currentVertexId; - for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { + for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; + iter->next()) { auto key = iter->key(); auto vertexId = NebulaKeyUtils::getVertexId(context_->vIdLen(), key); @@ -78,7 +80,6 @@ class ScanVertexPropNode : public QueryNode { continue; } node_->doExecute(partId, vertexId.subpiece(0, vertexId.find_first_of('\0')).toString()); - rowCount++; } cpp2::ScanCursor c; @@ -100,6 +101,7 @@ class ScanVertexPropNode : public QueryNode { int64_t limit_; // cursors for next scan std::unordered_map* cursors_; + nebula::DataSet* resultDataSet_; }; // Node to scan edge of one partition @@ -119,7 +121,8 @@ class ScanEdgePropNode : public QueryNode { edgeNodes_(std::move(edgeNodes)), enableReadFollower_(enableReadFollower), limit_(limit), - cursors_(cursors) { + cursors_(cursors), + resultDataSet_(resultDataSet) { QueryNode::name_ = "ScanEdgePropNode"; std::vector*> edges; for (const auto& e : edgeNodes_) { @@ -156,7 +159,8 @@ class ScanEdgePropNode : public QueryNode { auto rowLimit = limit_; RowReaderWrapper reader; - for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { + for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; + iter->next()) { auto key = iter->key(); if (!NebulaKeyUtils::isEdge(context_->vIdLen(), key)) { continue; @@ -199,6 +203,7 @@ class ScanEdgePropNode : public QueryNode { int64_t limit_; // cursors for next scan std::unordered_map* cursors_; + nebula::DataSet* resultDataSet_; }; } // namespace storage diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 8678f0daad4..2d004d28d00 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -4,6 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ +#include #include #include "common/base/Base.h" @@ -243,6 +244,52 @@ TEST(ScanEdgeTest, MultiplePartsTest) { } } +TEST(ScanEdgeTest, LimitTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + EdgeType serve = 101; + + { + LOG(INFO) << "Scan one edge with some properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + auto req = buildRequest({1}, {""}, edge, limit); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } + { + LOG(INFO) << "Scan one edge with all properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto edge = std::make_pair(serve, std::vector{}); + auto req = buildRequest({1}, {""}, edge, limit); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 9 columns in value + checkResponse(*resp.edge_data_ref(), edge, 9, totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 8a163aecb69..73714442002 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -271,6 +271,63 @@ TEST(ScanVertexTest, MultiplePartsTest) { } } +TEST(ScanVertexTest, LimitTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto tag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto req = buildRequest({2}, {""}, tag, limit); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } + { + LOG(INFO) << "Scan one tag with all properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto tag = std::make_pair(player, std::vector{}); + auto respTag = std::make_pair(player, + std::vector{"name", + "age", + "playing", + "career", + "startYear", + "endYear", + "games", + "avgScore", + "serveTeams", + "country", + "champions"}); + auto req = buildRequest({2}, {""}, tag, limit); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 11 columns in value + checkResponse(*resp.vertex_data_ref(), respTag, 11 + 1 /* kVid */, totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } +} + } // namespace storage } // namespace nebula From fa11ae5c5e08cfb26754e022ce47d300a61b5172 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Wed, 3 Nov 2021 16:51:51 +0800 Subject: [PATCH 04/10] Remove unused include. --- src/storage/exec/ScanNode.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index e73014115eb..ddf6e81b882 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -6,8 +6,6 @@ #pragma once -#include - #include "common/base/Base.h" #include "storage/exec/GetPropNode.h" From 0fe975484320154cf1d8cf42d5d0ef187741dbbc Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Thu, 4 Nov 2021 16:31:58 +0800 Subject: [PATCH 05/10] Support multiple tags. --- src/clients/storage/GraphStorageClient.cpp | 2 +- src/clients/storage/GraphStorageClient.h | 9 +- src/interface/storage.thrift | 2 +- src/storage/query/ScanVertexProcessor.cpp | 2 +- src/storage/test/ScanVertexTest.cpp | 152 ++++++++++++++++++--- 5 files changed, 139 insertions(+), 28 deletions(-) diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/GraphStorageClient.cpp index e4400c0e9fb..3769e77af07 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/GraphStorageClient.cpp @@ -593,7 +593,7 @@ StorageRpcRespFuture GraphStorageClient::scanEdge( StorageRpcRespFuture GraphStorageClient::scanVertex( const CommonRequestParam& param, - const cpp2::VertexProp& vertexProp, + const std::vector& vertexProp, int64_t limit, const Expression* filter) { std::unordered_map requests; diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/GraphStorageClient.h index ddb1b5b7d78..48c289b018e 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/GraphStorageClient.h @@ -136,10 +136,11 @@ class GraphStorageClient : public StorageClientBase scanVertex(const CommonRequestParam& param, - const cpp2::VertexProp& vertexProp, - int64_t limit, - const Expression* filter); + StorageRpcRespFuture scanVertex( + const CommonRequestParam& param, + const std::vector& vertexProp, + int64_t limit, + const Expression* filter); private: StatusOr> getIdFromRow(GraphSpaceID space, diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index fcd275e30a3..72b107c3311 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -567,7 +567,7 @@ struct ScanVertexRequest { 1: common.GraphSpaceID space_id, 2: map (cpp.template = "std::unordered_map") parts, - 3: VertexProp return_columns, + 3: list return_columns, // max row count of tag in this response 4: i64 limit, // only return data in time range [start_time, end_time) diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index b2b732c2272..0127c4a9e59 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -63,7 +63,7 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( return ret; } - std::vector returnProps = {*req.return_columns_ref()}; + std::vector returnProps = *req.return_columns_ref(); ret = handleVertexProps(returnProps); buildTagColName(returnProps); return ret; diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 73714442002..f7b058aa0c2 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -15,13 +15,14 @@ namespace nebula { namespace storage { -cpp2::ScanVertexRequest buildRequest(std::vector partIds, - std::vector cursors, - const std::pair>& tag, - int64_t rowLimit = 100, - int64_t startTime = 0, - int64_t endTime = std::numeric_limits::max(), - bool onlyLatestVer = false) { +cpp2::ScanVertexRequest buildRequest( + std::vector partIds, + std::vector cursors, + const std::vector>>& tags, + int64_t rowLimit = 100, + int64_t startTime = 0, + int64_t endTime = std::numeric_limits::max(), + bool onlyLatestVer = false) { cpp2::ScanVertexRequest req; req.set_space_id(1); cpp2::ScanCursor c; @@ -33,13 +34,17 @@ cpp2::ScanVertexRequest buildRequest(std::vector partIds, parts.emplace(partIds[i], c); } req.set_parts(std::move(parts)); - TagID tagId = tag.first; - cpp2::VertexProp vertexProp; - vertexProp.set_tag(tagId); - for (const auto& prop : tag.second) { - (*vertexProp.props_ref()).emplace_back(std::move(prop)); + std::vector vertexProps; + for (const auto& tag : tags) { + TagID tagId = tag.first; + cpp2::VertexProp vertexProp; + vertexProp.set_tag(tagId); + for (const auto& prop : tag.second) { + (*vertexProp.props_ref()).emplace_back(std::move(prop)); + } + vertexProps.emplace_back(std::move(vertexProp)); } - req.set_return_columns(std::move(vertexProp)); + req.set_return_columns(std::move(vertexProps)); req.set_limit(rowLimit); req.set_start_time(startTime); req.set_end_time(endTime); @@ -109,7 +114,7 @@ TEST(ScanVertexTest, PropertyTest) { auto tag = std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest({partId}, {""}, tag); + auto req = buildRequest({partId}, {""}, {tag}); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -137,7 +142,7 @@ TEST(ScanVertexTest, PropertyTest) { "country", "champions"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest({partId}, {""}, tag); + auto req = buildRequest({partId}, {""}, {tag}); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -171,7 +176,7 @@ TEST(ScanVertexTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest({partId}, {cursor}, tag, 5); + auto req = buildRequest({partId}, {cursor}, {tag}, 5); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -198,7 +203,7 @@ TEST(ScanVertexTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest({partId}, {cursor}, tag, 1); + auto req = buildRequest({partId}, {cursor}, {tag}, 1); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -234,7 +239,7 @@ TEST(ScanVertexTest, MultiplePartsTest) { size_t totalRowCount = 0; auto tag = std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); - auto req = buildRequest({1, 3}, {"", ""}, tag); + auto req = buildRequest({1, 3}, {"", ""}, {tag}); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -259,7 +264,7 @@ TEST(ScanVertexTest, MultiplePartsTest) { "serveTeams", "country", "champions"}); - auto req = buildRequest({1, 3}, {"", ""}, tag); + auto req = buildRequest({1, 3}, {"", ""}, {tag}); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -288,7 +293,7 @@ TEST(ScanVertexTest, LimitTest) { size_t totalRowCount = 0; auto tag = std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); - auto req = buildRequest({2}, {""}, tag, limit); + auto req = buildRequest({2}, {""}, {tag}, limit); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -315,7 +320,7 @@ TEST(ScanVertexTest, LimitTest) { "serveTeams", "country", "champions"}); - auto req = buildRequest({2}, {""}, tag, limit); + auto req = buildRequest({2}, {""}, {tag}, limit); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -328,6 +333,111 @@ TEST(ScanVertexTest, LimitTest) { } } +TEST(ScanVertexTest, MultipleTagsTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + TagID team = 2; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + // size_t totalRowCount = 0; + auto playerTag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto teamTag = std::make_pair(team, std::vector{kTag, "name"}); + auto req = buildRequest({1}, {""}, {playerTag, teamTag}); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + nebula::DataSet expect( + {"_vid", "1._vid", "1._tag", "1.name", "1.age", "1.avgScore", "2._tag", "2.name"}); + expect.emplace_back(List({"Bulls", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Bulls"})); + expect.emplace_back(List({"Cavaliers", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Cavaliers"})); + expect.emplace_back(List({"Damian Lillard", + "Damian Lillard", + 1, + "Damian Lillard", + 29, + 24, + Value::kEmpty, + Value::kEmpty})); + expect.emplace_back(List( + {"Jason Kidd", "Jason Kidd", 1, "Jason Kidd", 47, 12.6, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List( + {"Kevin Durant", "Kevin Durant", 1, "Kevin Durant", 31, 27, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List( + {"Kobe Bryant", "Kobe Bryant", 1, "Kobe Bryant", 41, 25, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List({"Kristaps Porzingis", + "Kristaps Porzingis", + 1, + "Kristaps Porzingis", + 24, + 18.1, + Value::kEmpty, + Value::kEmpty})); + expect.emplace_back(List( + {"Luka Doncic", "Luka Doncic", 1, "Luka Doncic", 21, 24.4, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List({"Mavericks", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Mavericks"})); + expect.emplace_back(List({"Nuggets", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Nuggets"})); + expect.emplace_back(List( + {"Paul George", "Paul George", 1, "Paul George", 30, 19.9, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List({"Tracy McGrady", + "Tracy McGrady", + 1, + "Tracy McGrady", + 41, + 19.6, + Value::kEmpty, + Value::kEmpty})); + expect.emplace_back(List({"Vince Carter", + "Vince Carter", + 1, + "Vince Carter", + 43, + 16.7, + Value::kEmpty, + Value::kEmpty})); + EXPECT_EQ(expect, *resp.vertex_data_ref()); + } +} + } // namespace storage } // namespace nebula From d8958defb1bdb214046d5171788735611e4f53a1 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 5 Nov 2021 11:17:03 +0800 Subject: [PATCH 06/10] Fix license header. --- src/storage/exec/ScanNode.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index ddf6e81b882..1c32fa094dd 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -1,7 +1,6 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. +/* Copyright (c) 2020 vesoft inc. All rights reserved. * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. + * This source code is licensed under Apache 2.0 License. */ #pragma once From 236e4f9ee3971ed1d963d9f3ba7b3ac0261af3e4 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Tue, 9 Nov 2021 11:30:35 +0800 Subject: [PATCH 07/10] Optimize the extra read operations. --- src/storage/exec/EdgeNode.h | 21 ++- src/storage/exec/ScanNode.h | 166 ++++++++++++++++++------ src/storage/exec/TagNode.h | 21 ++- src/storage/query/ScanEdgeProcessor.cpp | 13 +- 4 files changed, 162 insertions(+), 59 deletions(-) diff --git a/src/storage/exec/EdgeNode.h b/src/storage/exec/EdgeNode.h index 988923f5f44..beee24665f3 100644 --- a/src/storage/exec/EdgeNode.h +++ b/src/storage/exec/EdgeNode.h @@ -26,7 +26,9 @@ class EdgeNode : public IterateNode { return valueHandler(this->key(), this->reader(), props_); } - const std::string& getEdgeName() { return edgeName_; } + const std::string& getEdgeName() const { return edgeName_; } + + EdgeType edgeType() const { return edgeType_; } protected: EdgeNode(RuntimeContext* context, @@ -113,8 +115,7 @@ class FetchEdgeNode final : public EdgeNode { (*edgeKey.dst_ref()).getStr()); ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &val_); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { - resetReader(); - return nebula::cpp2::ErrorCode::SUCCEEDED; + return doExecute(key_, val_); } else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { // regard key not found as succeed as well, upper node will handle it return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -122,6 +123,20 @@ class FetchEdgeNode final : public EdgeNode { return ret; } + nebula::cpp2::ErrorCode doExecute(const std::string& key, const std::string& value) { + key_ = key; + val_ = value; + resetReader(); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + void clear() { + valid_ = false; + key_.clear(); + val_.clear(); + reader_.reset(); + } + private: void resetReader() { reader_.reset(*schemas_, val_); diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 1c32fa094dd..1b92f742fb4 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -31,13 +31,8 @@ class ScanVertexPropNode : public QueryNode { cursors_(cursors), resultDataSet_(resultDataSet) { name_ = "ScanVertexPropNode"; - std::vector tags; - for (const auto& t : tagNodes_) { - tags.emplace_back(t.get()); - } - node_ = std::make_unique(context, tags, resultDataSet); - for (auto* tag : tags) { - node_->addDependency(tag); + for (std::size_t i = 0; i < tagNodes_.size(); ++i) { + tagNodesIndex_.emplace(tagNodes_[i]->tagId(), i); } } @@ -64,19 +59,47 @@ class ScanVertexPropNode : public QueryNode { const auto rowLimit = limit_; RowReaderWrapper reader; + auto vIdLen = context_->vIdLen(); + auto isIntId = context_->isIntId(); std::string currentVertexId; + for (; iter->valid(); iter->next()) { + auto key = iter->key(); + auto tagId = NebulaKeyUtils::getTagId(vIdLen, key); + auto tagIdIndex = tagNodesIndex_.find(tagId); + if (tagIdIndex != tagNodesIndex_.end()) { + break; + } + } + if (iter->valid()) { + auto vIdSlice = NebulaKeyUtils::getVertexId(vIdLen, iter->key()); + currentVertexId = vIdSlice.subpiece(0, vIdSlice.find_first_of('\0')).toString(); + } for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; iter->next()) { auto key = iter->key(); - - auto vertexId = NebulaKeyUtils::getVertexId(context_->vIdLen(), key); + auto tagId = NebulaKeyUtils::getTagId(vIdLen, key); + auto tagIdIndex = tagNodesIndex_.find(tagId); + if (tagIdIndex == tagNodesIndex_.end()) { + continue; + } + auto vIdSlice = NebulaKeyUtils::getVertexId(vIdLen, key); + auto vertexId = vIdSlice.subpiece(0, vIdSlice.find_first_of('\0')); if (vertexId != currentVertexId) { + collectOneRow(isIntId, vIdLen, currentVertexId); currentVertexId = vertexId; - } else { - // skip same vertex id tag key - continue; + } // collect vertex row + if (static_cast(resultDataSet_->rowSize()) >= rowLimit) { + break; + } + auto value = iter->val(); + tagNodes_[tagIdIndex->second]->doExecute(key.toString(), value.toString()); + } // iterate key + if (static_cast(resultDataSet_->rowSize()) < rowLimit) { + collectOneRow(isIntId, vIdLen, currentVertexId); + } else { + for (auto& tagNode : tagNodes_) { + tagNode->clear(); } - node_->doExecute(partId, vertexId.subpiece(0, vertexId.find_first_of('\0')).toString()); } cpp2::ScanCursor c; @@ -90,10 +113,55 @@ class ScanVertexPropNode : public QueryNode { return nebula::cpp2::ErrorCode::SUCCEEDED; } + void collectOneRow(bool isIntId, std::size_t vIdLen, const std::string& currentVertexId) { + List row; + nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; + // vertexId is the first column + if (isIntId) { + row.emplace_back(*reinterpret_cast(currentVertexId.data())); + } else { + row.emplace_back(currentVertexId); + } + // if none of the tag node valid, do not emplace the row + if (std::any_of(tagNodes_.begin(), tagNodes_.end(), [](const auto& tagNode) { + return tagNode->valid(); + })) { + for (auto& tagNode : tagNodes_) { + ret = tagNode->collectTagPropsIfValid( + [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + for (const auto& prop : *props) { + if (prop.returned_) { + row.emplace_back(Value()); + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }, + [&row, vIdLen, isIntId]( + folly::StringPiece key, + RowReader* reader, + const std::vector* props) -> nebula::cpp2::ErrorCode { + if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + } + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + for (auto& tagNode : tagNodes_) { + tagNode->clear(); + } + } + } + private: RuntimeContext* context_; - std::unique_ptr node_; std::vector> tagNodes_; + std::unordered_map tagNodesIndex_; bool enableReadFollower_; int64_t limit_; // cursors for next scan @@ -107,28 +175,20 @@ class ScanEdgePropNode : public QueryNode { using RelNode::doExecute; ScanEdgePropNode(RuntimeContext* context, - std::unordered_set edgeTypes, - std::vector>> edgeNodes, + std::vector> edgeNodes, bool enableReadFollower, int64_t limit, std::unordered_map* cursors, nebula::DataSet* resultDataSet) : context_(context), - edgeTypes_(std::move(edgeTypes)), edgeNodes_(std::move(edgeNodes)), enableReadFollower_(enableReadFollower), limit_(limit), cursors_(cursors), resultDataSet_(resultDataSet) { QueryNode::name_ = "ScanEdgePropNode"; - std::vector*> edges; - for (const auto& e : edgeNodes_) { - edges.emplace_back(e.get()); - } - node_ = std::make_unique(context, edges, resultDataSet); - - for (auto* edge : edges) { - node_->addDependency(edge); + for (std::size_t i = 0; i < edgeNodes_.size(); ++i) { + edgeNodesIndex_.emplace(edgeNodes_[i]->edgeType(), i); } } @@ -155,29 +215,50 @@ class ScanEdgePropNode : public QueryNode { auto rowLimit = limit_; RowReaderWrapper reader; - + auto vIdLen = context_->vIdLen(); + auto isIntId = context_->isIntId(); for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; iter->next()) { auto key = iter->key(); - if (!NebulaKeyUtils::isEdge(context_->vIdLen(), key)) { + if (!NebulaKeyUtils::isEdge(vIdLen, key)) { continue; } - - auto srcId = NebulaKeyUtils::getSrcId(context_->vIdLen(), key); - auto dstId = NebulaKeyUtils::getDstId(context_->vIdLen(), key); - auto edgeType = NebulaKeyUtils::getEdgeType(context_->vIdLen(), key); - auto ranking = NebulaKeyUtils::getRank(context_->vIdLen(), key); - - if (edgeTypes_.find(edgeType) == edgeTypes_.end()) { + auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen, key); + auto edgeNodeIndex = edgeNodesIndex_.find(edgeType); + if (edgeNodeIndex == edgeNodesIndex_.end()) { continue; } + auto value = iter->val(); + edgeNodes_[edgeNodeIndex->second]->doExecute(key.toString(), value.toString()); - cpp2::EdgeKey edgeKey; - edgeKey.set_src(srcId.subpiece(0, srcId.find_first_of('\0')).toString()); - edgeKey.set_dst(dstId.subpiece(0, dstId.find_first_of('\0')).toString()); - edgeKey.set_edge_type(edgeType); - edgeKey.set_ranking(ranking); - node_->doExecute(partId, std::move(edgeKey)); + List row; + for (auto& edgeNode : edgeNodes_) { + ret = edgeNode->collectEdgePropsIfValid( + [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + for (const auto& prop : *props) { + if (prop.returned_) { + row.emplace_back(Value()); + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }, + [&row, vIdLen, isIntId]( + folly::StringPiece key, + RowReader* reader, + const std::vector* props) -> nebula::cpp2::ErrorCode { + if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { + return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + } + resultDataSet_->rows.emplace_back(std::move(row)); + for (auto& edgeNode : edgeNodes_) { + edgeNode->clear(); + } } cpp2::ScanCursor c; @@ -193,9 +274,8 @@ class ScanEdgePropNode : public QueryNode { private: RuntimeContext* context_; - std::unordered_set edgeTypes_; - std::vector>> edgeNodes_; - std::unique_ptr node_; + std::vector> edgeNodes_; + std::unordered_map edgeNodesIndex_; bool enableReadFollower_; int64_t limit_; // cursors for next scan diff --git a/src/storage/exec/TagNode.h b/src/storage/exec/TagNode.h index e203b494b6c..d6d597addc8 100644 --- a/src/storage/exec/TagNode.h +++ b/src/storage/exec/TagNode.h @@ -53,8 +53,7 @@ class TagNode final : public IterateNode { key_ = NebulaKeyUtils::vertexKey(context_->vIdLen(), partId, vId, tagId_); ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &value_); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { - resetReader(); - return nebula::cpp2::ErrorCode::SUCCEEDED; + return doExecute(key_, value_); } else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { // regard key not found as succeed as well, upper node will handle it return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -62,6 +61,13 @@ class TagNode final : public IterateNode { return ret; } + nebula::cpp2::ErrorCode doExecute(const std::string& key, const std::string& value) { + key_ = key; + value_ = value; + resetReader(); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + nebula::cpp2::ErrorCode collectTagPropsIfValid(NullHandler nullHandler, PropHandler valueHandler) { if (!valid()) { @@ -83,7 +89,16 @@ class TagNode final : public IterateNode { RowReader* reader() const override { return reader_.get(); } - const std::string& getTagName() { return tagName_; } + const std::string& getTagName() const { return tagName_; } + + TagID tagId() const { return tagId_; } + + void clear() { + valid_ = false; + key_.clear(); + value_.clear(); + reader_.reset(); + } private: void resetReader() { diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 5e3cbc52c66..5da9b6425e6 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -87,20 +87,13 @@ StoragePlan ScanEdgeProcessor::buildPlan( nebula::DataSet* result, std::unordered_map* cursors) { StoragePlan plan; - std::vector>> edges; - std::unordered_set edgeTypes; + std::vector> edges; for (const auto& ec : edgeContext_.propContexts_) { edges.emplace_back( std::make_unique(context, &edgeContext_, ec.first, &ec.second)); - edgeTypes.emplace(ec.first); } - auto output = std::make_unique(context, - std::move(edgeTypes), - std::move(edges), - enableReadFollower_, - limit_, - cursors, - result); + auto output = std::make_unique( + context, std::move(edges), enableReadFollower_, limit_, cursors, result); plan.addNode(std::move(output)); return plan; From 57ba9c5bb60e94a6c27fd0f674efef5482d11c43 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Tue, 9 Nov 2021 11:56:02 +0800 Subject: [PATCH 08/10] Fix compile error. --- src/storage/exec/ScanNode.h | 68 ++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 1b92f742fb4..0943b5320ae 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -5,6 +5,8 @@ #pragma once +#include + #include "common/base/Base.h" #include "storage/exec/GetPropNode.h" @@ -58,7 +60,6 @@ class ScanVertexPropNode : public QueryNode { } const auto rowLimit = limit_; - RowReaderWrapper reader; auto vIdLen = context_->vIdLen(); auto isIntId = context_->isIntId(); std::string currentVertexId; @@ -214,7 +215,6 @@ class ScanEdgePropNode : public QueryNode { } auto rowLimit = limit_; - RowReaderWrapper reader; auto vIdLen = context_->vIdLen(); auto isIntId = context_->isIntId(); for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; @@ -230,35 +230,7 @@ class ScanEdgePropNode : public QueryNode { } auto value = iter->val(); edgeNodes_[edgeNodeIndex->second]->doExecute(key.toString(), value.toString()); - - List row; - for (auto& edgeNode : edgeNodes_) { - ret = edgeNode->collectEdgePropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { - for (const auto& prop : *props) { - if (prop.returned_) { - row.emplace_back(Value()); - } - } - return nebula::cpp2::ErrorCode::SUCCEEDED; - }, - [&row, vIdLen, isIntId]( - folly::StringPiece key, - RowReader* reader, - const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { - return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; - } - return nebula::cpp2::ErrorCode::SUCCEEDED; - }); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - return ret; - } - } - resultDataSet_->rows.emplace_back(std::move(row)); - for (auto& edgeNode : edgeNodes_) { - edgeNode->clear(); - } + collectOneRow(isIntId, vIdLen); } cpp2::ScanCursor c; @@ -272,6 +244,40 @@ class ScanEdgePropNode : public QueryNode { return nebula::cpp2::ErrorCode::SUCCEEDED; } + void collectOneRow(bool isIntId, std::size_t vIdLen) { + List row; + nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; + for (auto& edgeNode : edgeNodes_) { + ret = edgeNode->collectEdgePropsIfValid( + [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + for (const auto& prop : *props) { + if (prop.returned_) { + row.emplace_back(Value()); + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }, + [&row, vIdLen, isIntId]( + folly::StringPiece key, + RowReader* reader, + const std::vector* props) -> nebula::cpp2::ErrorCode { + if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { + return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + } + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + for (auto& edgeNode : edgeNodes_) { + edgeNode->clear(); + } + } + private: RuntimeContext* context_; std::vector> edgeNodes_; From e5025464e3240421bd4e66c37e53b8e4a93c77d5 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Tue, 9 Nov 2021 16:17:24 +0800 Subject: [PATCH 09/10] Skip invalid tag in one loop. --- src/storage/exec/ScanNode.h | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 0943b5320ae..bd4dc6138f5 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -5,8 +5,6 @@ #pragma once -#include - #include "common/base/Base.h" #include "storage/exec/GetPropNode.h" @@ -63,18 +61,6 @@ class ScanVertexPropNode : public QueryNode { auto vIdLen = context_->vIdLen(); auto isIntId = context_->isIntId(); std::string currentVertexId; - for (; iter->valid(); iter->next()) { - auto key = iter->key(); - auto tagId = NebulaKeyUtils::getTagId(vIdLen, key); - auto tagIdIndex = tagNodesIndex_.find(tagId); - if (tagIdIndex != tagNodesIndex_.end()) { - break; - } - } - if (iter->valid()) { - auto vIdSlice = NebulaKeyUtils::getVertexId(vIdLen, iter->key()); - currentVertexId = vIdSlice.subpiece(0, vIdSlice.find_first_of('\0')).toString(); - } for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; iter->next()) { auto key = iter->key(); @@ -83,12 +69,11 @@ class ScanVertexPropNode : public QueryNode { if (tagIdIndex == tagNodesIndex_.end()) { continue; } - auto vIdSlice = NebulaKeyUtils::getVertexId(vIdLen, key); - auto vertexId = vIdSlice.subpiece(0, vIdSlice.find_first_of('\0')); - if (vertexId != currentVertexId) { + auto vertexId = NebulaKeyUtils::getVertexId(vIdLen, key); + if (vertexId != currentVertexId && !currentVertexId.empty()) { collectOneRow(isIntId, vIdLen, currentVertexId); - currentVertexId = vertexId; } // collect vertex row + currentVertexId = vertexId; if (static_cast(resultDataSet_->rowSize()) >= rowLimit) { break; } @@ -121,7 +106,7 @@ class ScanVertexPropNode : public QueryNode { if (isIntId) { row.emplace_back(*reinterpret_cast(currentVertexId.data())); } else { - row.emplace_back(currentVertexId); + row.emplace_back(currentVertexId.c_str()); } // if none of the tag node valid, do not emplace the row if (std::any_of(tagNodes_.begin(), tagNodes_.end(), [](const auto& tagNode) { From 96b00e601e4b95f8bbd1df872df60997752daf1a Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Tue, 9 Nov 2021 16:32:35 +0800 Subject: [PATCH 10/10] Avoid extra logical. --- src/storage/exec/ScanNode.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index bd4dc6138f5..3778eb87804 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -82,10 +82,6 @@ class ScanVertexPropNode : public QueryNode { } // iterate key if (static_cast(resultDataSet_->rowSize()) < rowLimit) { collectOneRow(isIntId, vIdLen, currentVertexId); - } else { - for (auto& tagNode : tagNodes_) { - tagNode->clear(); - } } cpp2::ScanCursor c;