Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetDstBySrc #4531

Merged
merged 6 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/datatypes/List.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ struct List {
values.emplace_back(std::forward<T>(v));
}

void append(List&& other) {
values.reserve(size() + other.size());
values.insert(values.end(),
std::make_move_iterator(other.values.begin()),
std::make_move_iterator(other.values.end()));
}

void clear() {
values.clear();
}
Expand Down
18 changes: 17 additions & 1 deletion src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ struct GetNeighborsResponse {
* End of GetNeighbors section
*/

struct GetDstBySrcRequest {
1: common.GraphSpaceID space_id,
// list of srcId in each part
2: map<common.PartitionID, list<common.Value>>
(cpp.template = "std::unordered_map") parts,
3: list<common.EdgeType> edge_types,
4: optional RequestCommon common,
}

struct GetDstBySrcResponse {
1: required ResponseCommon result,
// Only one dst column, each row is a dst
2: optional common.DataSet dsts,
}


//
// Response for data modification requests
Expand Down Expand Up @@ -661,7 +676,8 @@ struct KVRemoveRequest {
}

service GraphStorageService {
GetNeighborsResponse getNeighbors(1: GetNeighborsRequest req)
GetNeighborsResponse getNeighbors(1: GetNeighborsRequest req);
GetDstBySrcResponse getDstBySrc(1: GetDstBySrcRequest req);

// Get vertex or edge properties
GetPropResponse getProps(1: GetPropRequest req);
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ nebula_add_library(
mutate/UpdateVertexProcessor.cpp
mutate/UpdateEdgeProcessor.cpp
query/GetNeighborsProcessor.cpp
query/GetDstBySrcProcessor.cpp
query/GetPropProcessor.cpp
query/ScanVertexProcessor.cpp
query/ScanEdgeProcessor.cpp
Expand Down
8 changes: 8 additions & 0 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "storage/mutate/DeleteVerticesProcessor.h"
#include "storage/mutate/UpdateEdgeProcessor.h"
#include "storage/mutate/UpdateVertexProcessor.h"
#include "storage/query/GetDstBySrcProcessor.h"
#include "storage/query/GetNeighborsProcessor.h"
#include "storage/query/GetPropProcessor.h"
#include "storage/query/ScanEdgeProcessor.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env) : env_(e
kUpdateVertexCounters.init("update_vertex");
kUpdateEdgeCounters.init("update_edge");
kGetNeighborsCounters.init("get_neighbors");
kGetDstBySrcCounters.init("get_dst_by_src");
kGetPropCounters.init("get_prop");
kLookupCounters.init("lookup");
kScanVertexCounters.init("scan_vertex");
Expand Down Expand Up @@ -124,6 +126,12 @@ folly::Future<cpp2::GetNeighborsResponse> GraphStorageServiceHandler::future_get
RETURN_FUTURE(processor);
}

folly::Future<cpp2::GetDstBySrcResponse> GraphStorageServiceHandler::future_getDstBySrc(
const cpp2::GetDstBySrcRequest& req) {
auto* processor = GetDstBySrcProcessor::instance(env_, &kGetDstBySrcCounters, readerPool_.get());
RETURN_FUTURE(processor);
}

folly::Future<cpp2::GetPropResponse> GraphStorageServiceHandler::future_getProps(
const cpp2::GetPropRequest& req) {
auto* processor = GetPropProcessor::instance(env_, &kGetPropCounters, readerPool_.get());
Expand Down
3 changes: 3 additions & 0 deletions src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {
folly::Future<cpp2::GetNeighborsResponse> future_getNeighbors(
const cpp2::GetNeighborsRequest& req) override;

folly::Future<cpp2::GetDstBySrcResponse> future_getDstBySrc(
const cpp2::GetDstBySrcRequest& req) override;

folly::Future<cpp2::GetPropResponse> future_getProps(const cpp2::GetPropRequest& req) override;

folly::Future<cpp2::LookupIndexResp> future_lookupIndex(
Expand Down
95 changes: 95 additions & 0 deletions src/storage/exec/GetDstBySrcNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef STORAGE_EXEC_GETDSTBYSRCNODE_H_
#define STORAGE_EXEC_GETDSTBYSRCNODE_H_

#include "common/base/Base.h"

namespace nebula {
namespace storage {

class GetDstBySrcNode : public QueryNode<VertexID> {
public:
using RelNode::doExecute;

GetDstBySrcNode(RuntimeContext* context,
const std::vector<SingleEdgeNode*>& edgeNodes,
EdgeContext* edgeContext,
nebula::DataSet* result)
: context_(context), edgeNodes_(edgeNodes), edgeContext_(edgeContext), result_(result) {
name_ = "GetDstBySrcNode";
}

// need to override doExecute because the return format of GetNeighborsNode and
// GetDstBySrcNode are different
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}

std::vector<SingleEdgeIterator*> iters;
for (auto* edgeNode : edgeNodes_) {
iters.emplace_back(edgeNode->iter());
}
iter_.reset(new MultiEdgeIterator(std::move(iters)));
if (iter_->valid()) {
setCurrentEdgeInfo(iter_->edgeType());
}
return iterateEdges();
}

private:
nebula::cpp2::ErrorCode iterateEdges() {
for (; iter_->valid(); iter_->next()) {
EdgeType type = iter_->edgeType();
if (type != context_->edgeType_) {
// update info when edgeType changes while iterating over different edge types
setCurrentEdgeInfo(type);
}
auto key = iter_->key();
auto reader = iter_->reader();
auto props = context_->props_;
DCHECK_EQ(props->size(), 1);

nebula::List list;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nebula::Row is more clear.

// collect props need to return
if (!QueryUtils::collectEdgeProps(
key, context_->vIdLen(), context_->isIntId(), reader, props, list)
.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
}
result_->rows.emplace_back(std::move(list));
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void setCurrentEdgeInfo(EdgeType type) {
auto idxIter = edgeContext_->indexMap_.find(type);
CHECK(idxIter != edgeContext_->indexMap_.end());
auto schemaIter = edgeContext_->schemas_.find(std::abs(type));
CHECK(schemaIter != edgeContext_->schemas_.end());
CHECK(!schemaIter->second.empty());

context_->edgeSchema_ = schemaIter->second.back().get();
auto idx = idxIter->second;
context_->edgeType_ = type;
context_->edgeName_ = edgeNodes_[iter_->getIdx()]->getEdgeName();
context_->props_ = &(edgeContext_->propContexts_[idx].second);
}

private:
RuntimeContext* context_;
std::vector<SingleEdgeNode*> edgeNodes_;
EdgeContext* edgeContext_;
nebula::DataSet* result_;
std::unique_ptr<MultiEdgeIterator> iter_;
};

} // namespace storage
} // namespace nebula

#endif // STORAGE_EXEC_GETDSTBYSRCNODE_H_
1 change: 1 addition & 0 deletions src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class GetNeighborsSampleNode : public GetNeighborsNode {
int64_t limit)
: GetNeighborsNode(context, hashJoinNode, upstream, edgeContext, resultDataSet, limit) {
sampler_ = std::make_unique<nebula::algorithm::ReservoirSampling<Sample>>(limit);
name_ = "GetNeighborsSampleNode";
}

private:
Expand Down
6 changes: 0 additions & 6 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ class HashJoinNode : public IterateNode<VertexID> {

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
[&result](const std::vector<PropContext>*) -> nebula::cpp2::ErrorCode {
result.values.emplace_back(Value());
Expand Down Expand Up @@ -93,9 +90,6 @@ class HashJoinNode : public IterateNode<VertexID> {

std::vector<SingleEdgeIterator*> iters;
for (auto* edgeNode : edgeNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
iters.emplace_back(edgeNode->iter());
}
iter_.reset(new MultiEdgeIterator(std::move(iters)));
Expand Down
3 changes: 0 additions & 3 deletions src/storage/exec/MultiTagNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ class MultiTagNode : public IterateNode<VertexID> {

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same Q

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have been check on a higher level node, this is redundant. All the check only exists in output node, just to keep unify with others

if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
[&result](const std::vector<PropContext>*) -> nebula::cpp2::ErrorCode {
result.values.emplace_back(Value());
Expand Down
Loading