Skip to content

Commit

Permalink
Profile storage GetDstBySrcProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Nov 16, 2022
1 parent 150e418 commit 26c8a87
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 44 deletions.
2 changes: 2 additions & 0 deletions src/storage/context/StorageExpressionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class StorageExpressionContext final : public ExpressionContext {
Value getVertex(const std::string& name = "") const override {
UNUSED(name);
LOG(FATAL) << "Unimplemented";
return Value::kNullBadData;
}

/**
Expand All @@ -194,6 +195,7 @@ class StorageExpressionContext final : public ExpressionContext {
*/
Value getEdge() const override {
LOG(FATAL) << "Unimplemented";
return Value::kNullBadData;
}

/**
Expand Down
7 changes: 4 additions & 3 deletions src/storage/exec/GetDstBySrcNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class GetDstBySrcNode : public QueryNode<VertexID> {
GetDstBySrcNode(RuntimeContext* context,
const std::vector<SingleEdgeNode*>& edgeNodes,
EdgeContext* edgeContext,
nebula::List* result)
std::deque<Value>* result)
: context_(context), edgeNodes_(edgeNodes), edgeContext_(edgeContext), result_(result) {
name_ = "GetDstBySrcNode";
}
Expand All @@ -32,6 +32,7 @@ class GetDstBySrcNode : public QueryNode<VertexID> {
}

std::vector<SingleEdgeIterator*> iters;
iters.reserve(edgeNodes_.size());
for (auto* edgeNode : edgeNodes_) {
iters.emplace_back(edgeNode->iter());
}
Expand Down Expand Up @@ -62,7 +63,7 @@ class GetDstBySrcNode : public QueryNode<VertexID> {
.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
}
result_->values.emplace_back(std::move(list.values[0]));
result_->emplace_back(std::move(list.values[0]));
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand All @@ -85,7 +86,7 @@ class GetDstBySrcNode : public QueryNode<VertexID> {
RuntimeContext* context_;
std::vector<SingleEdgeNode*> edgeNodes_;
EdgeContext* edgeContext_;
nebula::List* result_;
std::deque<Value>* result_;
std::unique_ptr<MultiEdgeIterator> iter_;
};

Expand Down
3 changes: 2 additions & 1 deletion src/storage/exec/StoragePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class StoragePlan {
CHECK_LT(idx, nodes_.size());
return nodes_[idx].get();
}
const std::vector<std::unique_ptr<RelNode<T>>>& getNodes() {

const std::vector<std::unique_ptr<RelNode<T>>>& getNodes() const {
return nodes_;
}

Expand Down
6 changes: 0 additions & 6 deletions src/storage/mutate/UpdateEdgeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ class UpdateEdgeProcessor
std::vector<Expression*> getReturnPropsExp() {
return returnPropsExp_;
}
void profilePlan(StoragePlan<cpp2::EdgeKey>& plan) {
auto& nodes = plan.getNodes();
for (auto& node : nodes) {
profileDetail(node->name_, node->duration_.elapsedInUSec());
}
}

private:
std::unique_ptr<RuntimeContext> context_;
Expand Down
6 changes: 0 additions & 6 deletions src/storage/mutate/UpdateVertexProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ class UpdateVertexProcessor
// result.begin(), get); return result;
return returnPropsExp_;
}
void profilePlan(StoragePlan<VertexID>& plan) {
auto& nodes = plan.getNodes();
for (auto& node : nodes) {
profileDetail(node->name_, node->duration_.elapsedInUSec());
}
}

private:
std::unique_ptr<RuntimeContext> context_;
Expand Down
53 changes: 37 additions & 16 deletions src/storage/query/GetDstBySrcProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ void GetDstBySrcProcessor::process(const cpp2::GetDstBySrcRequest& req) {
}

void GetDstBySrcProcessor::doProcess(const cpp2::GetDstBySrcRequest& req) {
if (req.common_ref().has_value() && req.get_common()->profile_detail_ref().value_or(false)) {
profileDetailFlag_ = true;
profileDetail("GetDstBySrcProcessor", 0);
profileDetail("Dedup", 0);
}

spaceId_ = req.get_space_id();
auto retCode = getSpaceVidLen(spaceId_);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -84,6 +90,10 @@ void GetDstBySrcProcessor::runInSingleThread(const cpp2::GetDstBySrcRequest& req
}
}
}

if (UNLIKELY(profileDetailFlag_)) {
profilePlan(plan);
}
onProcessFinished();
onFinished();
}
Expand All @@ -104,25 +114,25 @@ void GetDstBySrcProcessor::runInMultipleThread(const cpp2::GetDstBySrcRequest& r
CHECK(!t.hasException());
const auto& tries = t.value();

size_t sum = 0;
for (size_t j = 0; j < tries.size(); j++) {
const auto& [code, partId] = tries[j].value();
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
sum += partResults_[j].values.size();
}
}
flatResult_.values.reserve(sum);
// size_t sum = 0;
// for (size_t j = 0; j < tries.size(); j++) {
// const auto& [code, partId] = tries[j].value();
// if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
// sum += partResults_[j].size();
// }
// }
// flatResult_.reserve(sum);

for (size_t j = 0; j < tries.size(); j++) {
DCHECK(!tries[j].hasException());
const auto& [code, partId] = tries[j].value();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(code, spaceId_, partId);
} else {
for (auto& v : partResults_[j].values) {
flatResult_.values.emplace_back(std::move(v));
for (auto& v : partResults_[j]) {
flatResult_.emplace_back(std::move(v));
}
std::vector<Value>().swap(partResults_[j].values);
std::deque<Value>().swap(partResults_[j]);
}
}

Expand All @@ -133,7 +143,7 @@ void GetDstBySrcProcessor::runInMultipleThread(const cpp2::GetDstBySrcRequest& r

folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> GetDstBySrcProcessor::runInExecutor(
RuntimeContext* context,
nebula::List* result,
std::deque<Value>* result,
PartitionID partId,
const std::vector<Value>& srcIds) {
return folly::via(executor_,
Expand All @@ -150,6 +160,9 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> GetDstBySrcProces

// the first column of each row would be the vertex id
auto ret = plan.go(partId, vId);
// if (UNLIKELY(profileDetailFlag_)) {
// profilePlan(plan);
// }
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return std::make_pair(ret, partId);
}
Expand All @@ -159,7 +172,7 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> GetDstBySrcProces
}

StoragePlan<VertexID> GetDstBySrcProcessor::buildPlan(RuntimeContext* context,
nebula::List* result) {
std::deque<Value>* result) {
/*
The StoragePlan looks like this:
+------------------+
Expand Down Expand Up @@ -230,16 +243,19 @@ nebula::cpp2::ErrorCode GetDstBySrcProcessor::buildEdgeContext(
}

void GetDstBySrcProcessor::onProcessFinished() {
if (profileDetailFlag_) {
dedupDuration_.reset();
}
// dedup the dsts before we return
static const auto kConcurrentThreshold = FLAGS_concurrent_dedup_threshold;
static const auto kMaxThreads = FLAGS_max_dedup_threads;
auto nRows = flatResult_.values.size();
auto nRows = flatResult_.size();
std::vector<Row> deduped;
using HashSet = robin_hood::unordered_flat_set<Value, std::hash<Value>>;
if (nRows < kConcurrentThreshold * 2) {
HashSet unique;
unique.reserve(nRows);
for (const auto& val : flatResult_.values) {
for (const auto& val : flatResult_) {
unique.emplace(val);
}
deduped.reserve(unique.size());
Expand Down Expand Up @@ -271,7 +287,7 @@ void GetDstBySrcProcessor::onProcessFinished() {
auto start = ranges[idx].first;
auto end = ranges[idx].second;
for (auto j = start; j < end; j++) {
sets[idx].emplace(std::move(flatResult_.values[j]));
sets[idx].emplace(std::move(flatResult_[j]));
}
};

Expand Down Expand Up @@ -302,6 +318,11 @@ void GetDstBySrcProcessor::onProcessFinished() {
}
resultDataSet_.rows = std::move(deduped);
resp_.dsts_ref() = std::move(resultDataSet_);

if (profileDetailFlag_) {
profileDetail("Dedup", dedupDuration_.elapsedInUSec());
profileDetail("GetDstBySrcProcessor", totalDuration_.elapsedInUSec());
}
}

} // namespace storage
Expand Down
13 changes: 9 additions & 4 deletions src/storage/query/GetDstBySrcProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#ifndef STORAGE_QUERY_GETDSTBYSRCPROCESSOR_H_
#define STORAGE_QUERY_GETDSTBYSRCPROCESSOR_H_

#include <deque>

#include "common/base/Base.h"
#include "storage/BaseProcessor.h"
#include "storage/exec/StoragePlan.h"
Expand Down Expand Up @@ -48,17 +50,20 @@ class GetDstBySrcProcessor

folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> runInExecutor(
RuntimeContext* context,
nebula::List* result,
std::deque<Value>* result,
PartitionID partId,
const std::vector<Value>& srcIds);

StoragePlan<VertexID> buildPlan(RuntimeContext* context, nebula::List* result);
StoragePlan<VertexID> buildPlan(RuntimeContext* context, std::deque<Value>* result);

private:
std::vector<RuntimeContext> contexts_;
// The process result of each part if run concurrently, then merge into resultDataSet_ at last
std::vector<nebula::List> partResults_;
nebula::List flatResult_;
std::vector<std::deque<Value>> partResults_;
std::deque<Value> flatResult_;

time::Duration totalDuration_;
time::Duration dedupDuration_;
};

} // namespace storage
Expand Down
7 changes: 0 additions & 7 deletions src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,12 +469,5 @@ void GetNeighborsProcessor::onProcessFinished() {
resp_.vertices_ref() = std::move(resultDataSet_);
}

void GetNeighborsProcessor::profilePlan(StoragePlan<VertexID>& plan) {
auto& nodes = plan.getNodes();
std::lock_guard<std::mutex> lck(BaseProcessor<cpp2::GetNeighborsResponse>::profileMut_);
for (auto& node : nodes) {
profileDetail(node->name_, node->duration_.elapsedInUSec());
}
}
} // namespace storage
} // namespace nebula
1 change: 0 additions & 1 deletion src/storage/query/GetNeighborsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class GetNeighborsProcessor
const std::vector<nebula::Value>& vids,
int64_t limit,
bool random);
void profilePlan(StoragePlan<VertexID>& plan);

private:
std::vector<RuntimeContext> contexts_;
Expand Down
10 changes: 10 additions & 0 deletions src/storage/query/QueryBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -670,5 +670,15 @@ void QueryBaseProcessor<REQ, RESP>::addPropContextIfNotExists(
}
}

template <typename REQ, typename RESP>
template <typename IdType>
void QueryBaseProcessor<REQ, RESP>::profilePlan(const StoragePlan<IdType>& plan) {
auto& nodes = plan.getNodes();
std::lock_guard<std::mutex> lck(BaseProcessor<RESP>::profileMut_);
for (auto& node : nodes) {
BaseProcessor<RESP>::profileDetail(node->name_, node->duration_.elapsedInUSec());
}
}

} // namespace storage
} // namespace nebula
6 changes: 6 additions & 0 deletions src/storage/query/QueryBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
namespace nebula {
namespace storage {

template <typename IdType>
class StoragePlan;

/**
* @brief The PropContext stores the info about property to be returned or filtered
*
Expand Down Expand Up @@ -200,6 +203,9 @@ class QueryBaseProcessor : public BaseProcessor<RESP> {
bool filtered,
const std::pair<size_t, cpp2::StatType>* statInfo = nullptr);

template <typename IdType>
void profilePlan(const StoragePlan<IdType>& plan);

protected:
GraphSpaceID spaceId_;
folly::Executor* executor_{nullptr};
Expand Down

0 comments on commit 26c8a87

Please sign in to comment.