Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiling storage detail #2754

Merged
9 changes: 9 additions & 0 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,17 @@ class StorageAccessExecutor : public Executor {
auto &hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
auto &info = hostLatency[i];
auto &response = resp.responses()[i];
stats.emplace(folly::stringPrintf("%s exec/total", std::get<0>(info).toString().c_str()),
folly::stringPrintf("%d(us)/%d(us)", std::get<1>(info), std::get<2>(info)));
if (response.result_ref()->latency_detail_us_ref().has_value()) {
std::string storageDetail = "{";
for (auto iter : (*response.result_ref()->latency_detail_us_ref())) {
storageDetail += folly::stringPrintf("%s:%d(us),", iter.first.data(), iter.second);
}
storageDetail += "}";
stats.emplace("storage_detail", storageDetail);
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
folly::stringPrintf("%s exec/total/vertices", std::get<0>(info).toString().c_str()),
folly::stringPrintf(
"%d(us)/%d(us)/%lu,", std::get<1>(info), std::get<2>(info), size));
if (result.result.latency_detail_us_ref().has_value()) {
std::string storageDetail = "{";
for (auto iter : (*result.result.latency_detail_us_ref())) {
storageDetail += folly::stringPrintf("%s:%d(us),", iter.first.data(), iter.second);
}
storageDetail += "}";
otherStats_.emplace("storage_detail", storageDetail);
}
}
return handleResponse(resp);
});
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
lookup->returnColumns())
.via(runner())
.thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
addStats(rpcResp, otherStats_);
return handleResp(std::move(rpcResp));
});
}
Expand Down
13 changes: 13 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class BaseProcessor {
}

this->result_.set_latency_in_us(this->duration_.elapsedInUSec());
if (!profileDetail_.empty()) {
this->result_.set_latency_detail_us(std::move(profileDetail_));
}
this->result_.set_failed_parts(this->codes_);
this->resp_.set_result(std::move(this->result_));
this->promise_.setValue(std::move(this->resp_));
Expand Down Expand Up @@ -102,6 +105,14 @@ class BaseProcessor {
const std::vector<Value>& props,
WriteResult& wRet);

virtual void profileDetail(const std::string& name, int32_t latency) {
if (!profileDetail_.count(name)) {
profileDetail_[name] = latency;
} else {
profileDetail_[name] += latency;
}
}

protected:
StorageEnv* env_{nullptr};
const ProcessorCounters* counters_;
Expand All @@ -116,6 +127,8 @@ class BaseProcessor {
int32_t callingNum_{0};
int32_t spaceVidLen_;
bool isIntId_;
std::map<std::string, int32_t> profileDetail_;
std::mutex profileMut_;
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ DEFINE_bool(query_concurrently,
false,
"whether to run query of each part concurrently, only lookup and "
"go are supported");
DEFINE_bool(profile_storage_detail, false, "Whether to profile storage plan detail");
2 changes: 2 additions & 0 deletions src/storage/StorageFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ DECLARE_int32(max_edge_returned_per_vertex);

DECLARE_bool(query_concurrently);

DECLARE_bool(profile_storage_detail);

#endif // STORAGE_STORAGEFLAGS_H_
14 changes: 8 additions & 6 deletions src/storage/exec/AggregateNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ struct PropStat {
template <typename T>
class AggregateNode : public IterateNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

AggregateNode(RuntimeContext* context, IterateNode<T>* upstream, EdgeContext* edgeContext)
: IterateNode<T>(upstream), context_(context), edgeContext_(edgeContext) {}
: IterateNode<T>(upstream), context_(context), edgeContext_(edgeContext) {
IterateNode<T>::name_ = "AggregateNode";
Copy link
Contributor

@yixinglu yixinglu Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cangfengzhs

I really don't recommend this way to initialize the fields of the base class. Please provide the constructor in the base class to do this, such as:

IterateNode<T>::IterateNode<T>(const std::string& name, ...) : name_(name) {}

AggregateNode(RuntimeContext* context, IterateNode<T>* upstream, EdgeContext* edgeContext) 
  : IterateNode("AggregateNode", upstream), context_(context)...{}

}

nebula::cpp2::ErrorCode execute(PartitionID partId, const T& input) override {
auto ret = RelNode<T>::execute(partId, input);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const T& input) override {
auto ret = RelNode<T>::doExecute(partId, input);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand All @@ -51,8 +53,8 @@ class AggregateNode : public IterateNode<T> {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/exec/DeDupNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ namespace storage {
template <typename T>
class DeDupNode : public IterateNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

explicit DeDupNode(nebula::DataSet* resultSet, const std::vector<size_t>& pos)
: resultSet_(resultSet), pos_(pos) {}
: resultSet_(resultSet), pos_(pos) {
IterateNode<T>::name_ = "DedupNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
25 changes: 16 additions & 9 deletions src/storage/exec/EdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ class EdgeNode : public IterateNode<T> {
schemas_ = &(schemaIter->second);
ttl_ = QueryUtils::getEdgeTTLInfo(edgeContext_, std::abs(edgeType_));
edgeName_ = edgeContext_->edgeNames_[edgeType_];
IterateNode<T>::name_ = "EdgeNode";
}

EdgeNode(RuntimeContext* context, EdgeContext* ctx) : context_(context), edgeContext_(ctx) {}
EdgeNode(RuntimeContext* context, EdgeContext* ctx) : context_(context), edgeContext_(ctx) {
IterateNode<T>::name_ = "EdgeNode";
}

RuntimeContext* context_;
EdgeContext* edgeContext_;
Expand All @@ -71,15 +74,17 @@ class EdgeNode : public IterateNode<T> {
// FetchEdgeNode is used to fetch a single edge
class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {
public:
using RelNode::execute;
using RelNode::doExecute;

FetchEdgeNode(RuntimeContext* context,
EdgeContext* edgeContext,
EdgeType edgeType,
const std::vector<PropContext>* props,
StorageExpressionContext* expCtx = nullptr,
Expression* exp = nullptr)
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {}
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {
name_ = "FetchEdgeNode";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

bool valid() const override { return valid_; }

Expand All @@ -91,9 +96,9 @@ class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {

RowReader* reader() const override { return reader_.get(); }

nebula::cpp2::ErrorCode execute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
valid_ = false;
auto ret = RelNode::execute(partId, edgeKey);
auto ret = RelNode::doExecute(partId, edgeKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down Expand Up @@ -143,14 +148,16 @@ class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {
// srcId
class SingleEdgeNode final : public EdgeNode<VertexID> {
public:
using RelNode::execute;
using RelNode::doExecute;
SingleEdgeNode(RuntimeContext* context,
EdgeContext* edgeContext,
EdgeType edgeType,
const std::vector<PropContext>* props,
StorageExpressionContext* expCtx = nullptr,
Expression* exp = nullptr)
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {}
: EdgeNode(context, edgeContext, edgeType, props, expCtx, exp) {
name_ = "SingleEdgeNode";
}

SingleEdgeIterator* iter() { return iter_.get(); }

Expand All @@ -164,8 +171,8 @@ class SingleEdgeNode final : public EdgeNode<VertexID> {

RowReader* reader() const override { return iter_->reader(); }

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
12 changes: 7 additions & 5 deletions src/storage/exec/FilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,24 @@ data, but not both.
As for GetNeighbors, it will have filter that involves both tag and edge
expression. In that case, FilterNode has a upstream of HashJoinNode, which will
keep popping out edge data. All tage data has been put into ExpressionContext
before FilterNode is executed. By that means, it can check the filter of tag +
before FilterNode is doExecuted. By that means, it can check the filter of tag +
edge.
*/
template <typename T>
class FilterNode : public IterateNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

FilterNode(RuntimeContext* context,
IterateNode<T>* upstream,
StorageExpressionContext* expCtx = nullptr,
Expression* exp = nullptr)
: IterateNode<T>(upstream), context_(context), expCtx_(expCtx), filterExp_(exp) {}
: IterateNode<T>(upstream), context_(context), expCtx_(expCtx), filterExp_(exp) {
IterateNode<T>::name_ = "FilterNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const T& vId) override {
auto ret = RelNode<T>::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const T& vId) override {
auto ret = RelNode<T>::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/exec/GetNeighborsNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace storage {
// target cell of a row.
class GetNeighborsNode : public QueryNode<VertexID> {
public:
using RelNode::execute;
using RelNode::doExecute;

GetNeighborsNode(RuntimeContext* context,
IterateNode<VertexID>* hashJoinNode,
Expand All @@ -35,10 +35,12 @@ class GetNeighborsNode : public QueryNode<VertexID> {
upstream_(upstream),
edgeContext_(edgeContext),
resultDataSet_(resultDataSet),
limit_(limit) {}
limit_(limit) {
name_ = "GetNeighborsNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
20 changes: 12 additions & 8 deletions src/storage/exec/GetPropNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ namespace storage {

class GetTagPropNode : public QueryNode<VertexID> {
public:
using RelNode<VertexID>::execute;
using RelNode<VertexID>::doExecute;

explicit GetTagPropNode(RuntimeContext* context,
std::vector<TagNode*> tagNodes,
nebula::DataSet* resultDataSet)
: context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet) {}
: context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet) {
name_ = "GetTagPropNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down Expand Up @@ -80,15 +82,17 @@ class GetTagPropNode : public QueryNode<VertexID> {

class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
public:
using RelNode::execute;
using RelNode::doExecute;

GetEdgePropNode(RuntimeContext* context,
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes,
nebula::DataSet* resultDataSet)
: context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet) {}
: context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet) {
QueryNode::name_ = "GetEdgePropNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
auto ret = RelNode::execute(partId, edgeKey);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
auto ret = RelNode::doExecute(partId, edgeKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
7 changes: 4 additions & 3 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace storage {
// return a iterator of edges which can pass ttl check and ready to be read.
class HashJoinNode : public IterateNode<VertexID> {
public:
using RelNode::execute;
using RelNode::doExecute;

HashJoinNode(RuntimeContext* context,
const std::vector<TagNode*>& tagNodes,
Expand All @@ -38,10 +38,11 @@ class HashJoinNode : public IterateNode<VertexID> {
edgeContext_(edgeContext),
expCtx_(expCtx) {
UNUSED(tagContext_);
IterateNode::name_ = "HashJoinNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::execute(partId, vId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace storage {
template <typename T>
class IndexEdgeNode final : public RelNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

IndexEdgeNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
Expand All @@ -25,10 +25,12 @@ class IndexEdgeNode final : public RelNode<T> {
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
schemaName_(schemaName) {}
schemaName_(schemaName) {
RelNode<T>::name_ = "IndexEdgeNode";
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
auto ret = RelNode<T>::execute(partId);
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
7 changes: 4 additions & 3 deletions src/storage/exec/IndexFilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace storage {
template <typename T>
class IndexFilterNode final : public RelNode<T> {
public:
using RelNode<T>::execute;
using RelNode<T>::doExecute;

// evalExprByIndex_ is true, all fileds in filter is in index. No need to read
// data anymore.
Expand All @@ -35,6 +35,7 @@ class IndexFilterNode final : public RelNode<T> {
filterExp_(exp),
isEdge_(isEdge) {
evalExprByIndex_ = true;
RelNode<T>::name_ = "IndexFilterNode";
}

// evalExprByIndex_ is false, some fileds in filter is out of index, which
Expand All @@ -59,9 +60,9 @@ class IndexFilterNode final : public RelNode<T> {
isEdge_ = false;
}

nebula::cpp2::ErrorCode execute(PartitionID partId) override {
nebula::cpp2::ErrorCode doExecute(PartitionID partId) override {
data_.clear();
auto ret = RelNode<T>::execute(partId);
auto ret = RelNode<T>::doExecute(partId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
Loading