Skip to content

Commit

Permalink
GetDstBySrc (#4531)
Browse files Browse the repository at this point in the history
* support basic feature

* dedup by pointer

* return DataSet instead of List

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
critical27 and Sophie-Xie authored Aug 18, 2022
1 parent 6317917 commit af1e1e8
Show file tree
Hide file tree
Showing 13 changed files with 688 additions and 11 deletions.
7 changes: 7 additions & 0 deletions src/common/datatypes/List.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ struct List {
values.emplace_back(std::forward<T>(v));
}

void append(List&& other) {
values.reserve(size() + other.size());
values.insert(values.end(),
std::make_move_iterator(other.values.begin()),
std::make_move_iterator(other.values.end()));
}

void clear() {
values.clear();
}
Expand Down
18 changes: 17 additions & 1 deletion src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ struct GetNeighborsResponse {
* End of GetNeighbors section
*/

struct GetDstBySrcRequest {
1: common.GraphSpaceID space_id,
// list of srcId in each part
2: map<common.PartitionID, list<common.Value>>
(cpp.template = "std::unordered_map") parts,
3: list<common.EdgeType> edge_types,
4: optional RequestCommon common,
}

struct GetDstBySrcResponse {
1: required ResponseCommon result,
// Only one dst column, each row is a dst
2: optional common.DataSet dsts,
}


//
// Response for data modification requests
Expand Down Expand Up @@ -661,7 +676,8 @@ struct KVRemoveRequest {
}

service GraphStorageService {
GetNeighborsResponse getNeighbors(1: GetNeighborsRequest req)
GetNeighborsResponse getNeighbors(1: GetNeighborsRequest req);
GetDstBySrcResponse getDstBySrc(1: GetDstBySrcRequest req);

// Get vertex or edge properties
GetPropResponse getProps(1: GetPropRequest req);
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ nebula_add_library(
mutate/UpdateVertexProcessor.cpp
mutate/UpdateEdgeProcessor.cpp
query/GetNeighborsProcessor.cpp
query/GetDstBySrcProcessor.cpp
query/GetPropProcessor.cpp
query/ScanVertexProcessor.cpp
query/ScanEdgeProcessor.cpp
Expand Down
8 changes: 8 additions & 0 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "storage/mutate/DeleteVerticesProcessor.h"
#include "storage/mutate/UpdateEdgeProcessor.h"
#include "storage/mutate/UpdateVertexProcessor.h"
#include "storage/query/GetDstBySrcProcessor.h"
#include "storage/query/GetNeighborsProcessor.h"
#include "storage/query/GetPropProcessor.h"
#include "storage/query/ScanEdgeProcessor.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ GraphStorageServiceHandler::GraphStorageServiceHandler(StorageEnv* env) : env_(e
kUpdateVertexCounters.init("update_vertex");
kUpdateEdgeCounters.init("update_edge");
kGetNeighborsCounters.init("get_neighbors");
kGetDstBySrcCounters.init("get_dst_by_src");
kGetPropCounters.init("get_prop");
kLookupCounters.init("lookup");
kScanVertexCounters.init("scan_vertex");
Expand Down Expand Up @@ -124,6 +126,12 @@ folly::Future<cpp2::GetNeighborsResponse> GraphStorageServiceHandler::future_get
RETURN_FUTURE(processor);
}

folly::Future<cpp2::GetDstBySrcResponse> GraphStorageServiceHandler::future_getDstBySrc(
const cpp2::GetDstBySrcRequest& req) {
auto* processor = GetDstBySrcProcessor::instance(env_, &kGetDstBySrcCounters, readerPool_.get());
RETURN_FUTURE(processor);
}

folly::Future<cpp2::GetPropResponse> GraphStorageServiceHandler::future_getProps(
const cpp2::GetPropRequest& req) {
auto* processor = GetPropProcessor::instance(env_, &kGetPropCounters, readerPool_.get());
Expand Down
3 changes: 3 additions & 0 deletions src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {
folly::Future<cpp2::GetNeighborsResponse> future_getNeighbors(
const cpp2::GetNeighborsRequest& req) override;

folly::Future<cpp2::GetDstBySrcResponse> future_getDstBySrc(
const cpp2::GetDstBySrcRequest& req) override;

folly::Future<cpp2::GetPropResponse> future_getProps(const cpp2::GetPropRequest& req) override;

folly::Future<cpp2::LookupIndexResp> future_lookupIndex(
Expand Down
95 changes: 95 additions & 0 deletions src/storage/exec/GetDstBySrcNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef STORAGE_EXEC_GETDSTBYSRCNODE_H_
#define STORAGE_EXEC_GETDSTBYSRCNODE_H_

#include "common/base/Base.h"

namespace nebula {
namespace storage {

class GetDstBySrcNode : public QueryNode<VertexID> {
public:
using RelNode::doExecute;

GetDstBySrcNode(RuntimeContext* context,
const std::vector<SingleEdgeNode*>& edgeNodes,
EdgeContext* edgeContext,
nebula::DataSet* result)
: context_(context), edgeNodes_(edgeNodes), edgeContext_(edgeContext), result_(result) {
name_ = "GetDstBySrcNode";
}

// need to override doExecute because the return format of GetNeighborsNode and
// GetDstBySrcNode are different
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}

std::vector<SingleEdgeIterator*> iters;
for (auto* edgeNode : edgeNodes_) {
iters.emplace_back(edgeNode->iter());
}
iter_.reset(new MultiEdgeIterator(std::move(iters)));
if (iter_->valid()) {
setCurrentEdgeInfo(iter_->edgeType());
}
return iterateEdges();
}

private:
nebula::cpp2::ErrorCode iterateEdges() {
for (; iter_->valid(); iter_->next()) {
EdgeType type = iter_->edgeType();
if (type != context_->edgeType_) {
// update info when edgeType changes while iterating over different edge types
setCurrentEdgeInfo(type);
}
auto key = iter_->key();
auto reader = iter_->reader();
auto props = context_->props_;
DCHECK_EQ(props->size(), 1);

nebula::List list;
// collect props need to return
if (!QueryUtils::collectEdgeProps(
key, context_->vIdLen(), context_->isIntId(), reader, props, list)
.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
}
result_->rows.emplace_back(std::move(list));
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void setCurrentEdgeInfo(EdgeType type) {
auto idxIter = edgeContext_->indexMap_.find(type);
CHECK(idxIter != edgeContext_->indexMap_.end());
auto schemaIter = edgeContext_->schemas_.find(std::abs(type));
CHECK(schemaIter != edgeContext_->schemas_.end());
CHECK(!schemaIter->second.empty());

context_->edgeSchema_ = schemaIter->second.back().get();
auto idx = idxIter->second;
context_->edgeType_ = type;
context_->edgeName_ = edgeNodes_[iter_->getIdx()]->getEdgeName();
context_->props_ = &(edgeContext_->propContexts_[idx].second);
}

private:
RuntimeContext* context_;
std::vector<SingleEdgeNode*> edgeNodes_;
EdgeContext* edgeContext_;
nebula::DataSet* result_;
std::unique_ptr<MultiEdgeIterator> iter_;
};

} // namespace storage
} // namespace nebula

#endif // STORAGE_EXEC_GETDSTBYSRCNODE_H_
1 change: 1 addition & 0 deletions src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class GetNeighborsSampleNode : public GetNeighborsNode {
int64_t limit)
: GetNeighborsNode(context, hashJoinNode, upstream, edgeContext, resultDataSet, limit) {
sampler_ = std::make_unique<nebula::algorithm::ReservoirSampling<Sample>>(limit);
name_ = "GetNeighborsSampleNode";
}

private:
Expand Down
6 changes: 0 additions & 6 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ class HashJoinNode : public IterateNode<VertexID> {

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
[&result](const std::vector<PropContext>*) -> nebula::cpp2::ErrorCode {
result.values.emplace_back(Value());
Expand Down Expand Up @@ -93,9 +90,6 @@ class HashJoinNode : public IterateNode<VertexID> {

std::vector<SingleEdgeIterator*> iters;
for (auto* edgeNode : edgeNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
iters.emplace_back(edgeNode->iter());
}
iter_.reset(new MultiEdgeIterator(std::move(iters)));
Expand Down
3 changes: 0 additions & 3 deletions src/storage/exec/MultiTagNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ class MultiTagNode : public IterateNode<VertexID> {

// add result of each tag node to tagResult
for (auto* tagNode : tagNodes_) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
ret = tagNode->collectTagPropsIfValid(
[&result](const std::vector<PropContext>*) -> nebula::cpp2::ErrorCode {
result.values.emplace_back(Value());
Expand Down
Loading

0 comments on commit af1e1e8

Please sign in to comment.