Skip to content

Commit

Permalink
profiling storage detail
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs committed Aug 30, 2021
1 parent f490d4d commit f7ca01d
Show file tree
Hide file tree
Showing 32 changed files with 204 additions and 71 deletions.
9 changes: 9 additions & 0 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,17 @@ class StorageAccessExecutor : public Executor {
auto &hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
auto &info = hostLatency[i];
auto &response = resp.responses()[i];
stats.emplace(folly::stringPrintf("%s exec/total", std::get<0>(info).toString().c_str()),
folly::stringPrintf("%d(us)/%d(us)", std::get<1>(info), std::get<2>(info)));
if (response.result_ref()->latency_detail_us_ref().has_value()) {
std::string storage_detail = "{";
for (auto iter : (*response.result_ref()->latency_detail_us_ref())) {
storage_detail += folly::stringPrintf("%s:%d(us),", iter.first.data(), iter.second);
}
storage_detail += "}";
stats.emplace("storage_detail", storage_detail);
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
folly::stringPrintf("%s exec/total/vertices", std::get<0>(info).toString().c_str()),
folly::stringPrintf(
"%d(us)/%d(us)/%lu,", std::get<1>(info), std::get<2>(info), size));
if (result.result.latency_detail_us_ref().has_value()) {
std::string storage_detail = "{";
for (auto iter : (*result.result.latency_detail_us_ref())) {
storage_detail += folly::stringPrintf("%s:%d(us),", iter.first.data(), iter.second);
}
storage_detail += "}";
otherStats_.emplace("storage_detail", storage_detail);
}
}
return handleResponse(resp);
});
Expand Down
1 change: 1 addition & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct ResponseCommon {
1: required list<PartitionResult> failed_parts,
// Query latency from storage service
2: required i32 latency_in_us,
3: optional map<string,i32> latency_detail_us,
}


Expand Down
13 changes: 13 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class BaseProcessor {
}

this->result_.set_latency_in_us(this->duration_.elapsedInUSec());
if (!profile_detail_.empty()) {
this->result_.set_latency_detail_us(std::move(profile_detail_));
}
this->result_.set_failed_parts(this->codes_);
this->resp_.set_result(std::move(this->result_));
this->promise_.setValue(std::move(this->resp_));
Expand Down Expand Up @@ -102,6 +105,14 @@ class BaseProcessor {
const std::vector<Value>& props,
WriteResult& wRet);

virtual void profile_detail(const std::string& name, int32_t latency) {
if (!profile_detail_.count(name)) {
profile_detail_[name] = latency;
} else {
profile_detail_[name] += latency;
}
}

protected:
StorageEnv* env_{nullptr};
const ProcessorCounters* counters_;
Expand All @@ -116,6 +127,8 @@ class BaseProcessor {
int32_t callingNum_{0};
int32_t spaceVidLen_;
bool isIntId_;
std::map<std::string, int32_t> profile_detail_;
std::mutex profile_mut_;
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ DEFINE_bool(query_concurrently,
false,
"whether to run query of each part concurrently, only lookup and "
"go are supported");
DEFINE_bool(profile_storage_detail, false, "Whether to profile storage plan detail");
2 changes: 2 additions & 0 deletions src/storage/StorageFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ DECLARE_int32(max_edge_returned_per_vertex);

DECLARE_bool(query_concurrently);

DECLARE_bool(profile_storage_detail);

#endif // STORAGE_STORAGEFLAGS_H_
14 changes: 8 additions & 6 deletions src/storage/exec/AggregateNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ struct PropStat {
template <typename T>
class AggregateNode : public IterateNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

AggregateNode(RuntimeContext* context, IterateNode<T>* upstream, EdgeContext* edgeContext)
: IterateNode<T>(upstream), context_(context), edgeContext_(edgeContext) {}
: IterateNode<T>(upstream), context_(context), edgeContext_(edgeContext) {
IterateNode<T>::name_ = "AggregateNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const T& input) override {
auto ret = RelNode<T>::execute(partId, input);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const T& input) override {
auto ret = RelNode<T>::doExecute(partId, input);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand All @@ -51,8 +53,8 @@ class AggregateNode : public IterateNode<T> {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/exec/DeDupNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ namespace storage {
template <typename T>
class DeDupNode : public IterateNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

explicit DeDupNode(nebula::DataSet* resultSet, const std::vector<size_t>& pos)
: resultSet_(resultSet), pos_(pos) {}
: resultSet_(resultSet), pos_(pos) {
IterateNode<T>::name_ = "DedupNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
25 changes: 16 additions & 9 deletions src/storage/exec/EdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ class EdgeNode : public IterateNode<T> {
schemas_ = &(schemaIter->second);
ttl_ = QueryUtils::getEdgeTTLInfo(edgeContext_, std::abs(edgeType_));
edgeName_ = edgeContext_->edgeNames_[edgeType_];
IterateNode<T>::name_ = "EdgeNode";
}

EdgeNode(RuntimeContext* context, EdgeContext* ctx) : context_(context), edgeContext_(ctx) {}
EdgeNode(RuntimeContext* context, EdgeContext* ctx) : context_(context), edgeContext_(ctx) {
IterateNode<T>::name_ = "EdgeNode";
}

RuntimeContext* context_;
EdgeContext* edgeContext_;
Expand All @@ -71,15 +74,17 @@ class EdgeNode : public IterateNode<T> {
// FetchEdgeNode is used to fetch a single edge
class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {
public:
using RelNode::execute;
using RelNode::doExecute;

FetchEdgeNode(RuntimeContext* context,
EdgeContext* edgeContext,
EdgeType edgeType,
const std::vector<PropContext>* props,
StorageExpressionContext* expCtx = nullptr,
Expression* exp = nullptr)
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {}
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {
name_ = "FetchEdgeNode";
}

bool valid() const override { return valid_; }

Expand All @@ -91,9 +96,9 @@ class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {

RowReader* reader() const override { return reader_.get(); }

nebula::cpp2::ErrorCode execute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
valid_ = false;
auto ret = RelNode::execute(partId, edgeKey);
auto ret = RelNode::doExecute(partId, edgeKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down Expand Up @@ -143,14 +148,16 @@ class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {
// srcId
class SingleEdgeNode final : public EdgeNode<VertexID> {
public:
using RelNode::execute;
using RelNode::doExecute;
SingleEdgeNode(RuntimeContext* context,
EdgeContext* edgeContext,
EdgeType edgeType,
const std::vector<PropContext>* props,
StorageExpressionContext* expCtx = nullptr,
Expression* exp = nullptr)
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {}
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {
name_ = "SingleEdgeNode";
}

SingleEdgeIterator* iter() { return iter_.get(); }

Expand All @@ -164,8 +171,8 @@ class SingleEdgeNode final : public EdgeNode<VertexID> {

RowReader* reader() const override { return iter_->reader(); }

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
12 changes: 7 additions & 5 deletions src/storage/exec/FilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,24 @@ data, but not both.
As for GetNeighbors, it will have filter that involves both tag and edge
expression. In that case, FilterNode has a upstream of HashJoinNode, which will
keep popping out edge data. All tage data has been put into ExpressionContext
before FilterNode is executed. By that means, it can check the filter of tag +
before FilterNode is doExecuted. By that means, it can check the filter of tag +
edge.
*/
template <typename T>
class FilterNode : public IterateNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

FilterNode(RuntimeContext* context,
IterateNode<T>* upstream,
StorageExpressionContext* expCtx = nullptr,
Expression* exp = nullptr)
: IterateNode<T>(upstream), context_(context), expCtx_(expCtx), filterExp_(exp) {}
: IterateNode<T>(upstream), context_(context), expCtx_(expCtx), filterExp_(exp) {
IterateNode<T>::name_ = "FilterNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const T& vId) override {
auto ret = RelNode<T>::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const T& vId) override {
auto ret = RelNode<T>::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace storage {
// target cell of a row.
class GetNeighborsNode : public QueryNode<VertexID> {
public:
using RelNode::execute;
using RelNode::doExecute;

GetNeighborsNode(RuntimeContext* context,
IterateNode<VertexID>* hashJoinNode,
Expand All @@ -35,10 +35,12 @@ class GetNeighborsNode : public QueryNode<VertexID> {
upstream_(upstream),
edgeContext_(edgeContext),
resultDataSet_(resultDataSet),
limit_(limit) {}
limit_(limit) {
name_ = "GetNeighborsNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
20 changes: 12 additions & 8 deletions src/storage/exec/GetPropNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ namespace storage {

class GetTagPropNode : public QueryNode<VertexID> {
public:
using RelNode<VertexID>::execute;
using RelNode<VertexID>::doExecute;

explicit GetTagPropNode(RuntimeContext* context,
std::vector<TagNode*> tagNodes,
nebula::DataSet* resultDataSet)
: context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet) {}
: context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet) {
name_ = "GetTagPropNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down Expand Up @@ -80,15 +82,17 @@ class GetTagPropNode : public QueryNode<VertexID> {

class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
public:
using RelNode::execute;
using RelNode::doExecute;

GetEdgePropNode(RuntimeContext* context,
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes,
nebula::DataSet* resultDataSet)
: context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet) {}
: context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet) {
QueryNode::name_ = "GetEdgePropNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
auto ret = RelNode::execute(partId, edgeKey);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
auto ret = RelNode::doExecute(partId, edgeKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
7 changes: 4 additions & 3 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace storage {
// return a iterator of edges which can pass ttl check and ready to be read.
class HashJoinNode : public IterateNode<VertexID> {
public:
using RelNode::execute;
using RelNode::doExecute;

HashJoinNode(RuntimeContext* context,
const std::vector<TagNode*>& tagNodes,
Expand All @@ -38,10 +38,11 @@ class HashJoinNode : public IterateNode<VertexID> {
edgeContext_(edgeContext),
expCtx_(expCtx) {
UNUSED(tagContext_);
IterateNode::name_ = "HashJoinNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace storage {
template <typename T>
class IndexEdgeNode final : public RelNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

IndexEdgeNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
Expand All @@ -25,10 +25,12 @@ class IndexEdgeNode final : public RelNode<T> {
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
schemaName_(schemaName) {}
schemaName_(schemaName) {
RelNode<T>::name_ = "IndexEdgeNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
7 changes: 4 additions & 3 deletions src/storage/exec/IndexFilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace storage {
template <typename T>
class IndexFilterNode final : public RelNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

// evalExprByIndex_ is true, all fileds in filter is in index. No need to read
// data anymore.
Expand All @@ -30,6 +30,7 @@ class IndexFilterNode final : public RelNode<T> {
bool isEdge = false)
: indexScanNode_(indexScanNode), exprCtx_(exprCtx), filterExp_(exp), isEdge_(isEdge) {
evalExprByIndex_ = true;
RelNode<T>::name_ = "IndexFilterNode";
}

// evalExprByIndex_ is false, some fileds in filter is out of index, which
Expand All @@ -52,9 +53,9 @@ class IndexFilterNode final : public RelNode<T> {
isEdge_ = false;
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
data_.clear();
auto ret = RelNode<T>::execute(partId);
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
Loading

0 comments on commit f7ca01d

Please sign in to comment.