Skip to content

Commit

Permalink
Fix stats init order (#3586)
Browse files Browse the repository at this point in the history
Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
3 people authored and jievince committed Dec 29, 2021
1 parent 3e71921 commit f37b131
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 7 deletions.
20 changes: 19 additions & 1 deletion src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,25 @@ Executor *Executor::makeExecutor(const PlanNode *node,
// static
Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
auto pool = qctx->objPool();
auto &spaceName = qctx->rctx()->session()->spaceName();
switch (node->kind()) {
case PlanNode::Kind::kPassThrough: {
return pool->add(new PassThroughExecutor(node, qctx));
}
case PlanNode::Kind::kAggregate: {
stats::StatsManager::addValue(kNumAggregateExecutors);
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumAggregateExecutors, {{"space", spaceName}}));
}
return pool->add(new AggregateExecutor(node, qctx));
}
case PlanNode::Kind::kSort: {
stats::StatsManager::addValue(kNumSortExecutors);
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumSortExecutors, {{"space", spaceName}}));
}
return pool->add(new SortExecutor(node, qctx));
}
case PlanNode::Kind::kTopN: {
Expand Down Expand Up @@ -208,6 +217,10 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kTagIndexPrefixScan:
case PlanNode::Kind::kTagIndexRangeScan: {
stats::StatsManager::addValue(kNumIndexScanExecutors);
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumIndexScanExecutors, {{"space", spaceName}}));
}
return pool->add(new IndexScanExecutor(node, qctx));
}
case PlanNode::Kind::kStart: {
Expand Down Expand Up @@ -594,7 +607,12 @@ Status Executor::close() {

Status Executor::checkMemoryWatermark() {
if (node_->isQueryNode() && MemoryUtils::kHitMemoryHighWatermark.load()) {
stats::StatsManager::addValue(kNumOomExecutors);
stats::StatsManager::addValue(kNumOomQueries);
auto &spaceName = qctx()->rctx()->session()->spaceName();
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumOomQueries, {{"space", spaceName}}));
}
return Status::Error("Used memory hits the high watermark(%lf) of total system memory.",
FLAGS_system_memory_high_watermark_ratio);
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/optimizer/rule/IndexScanRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

#include "graph/optimizer/rule/IndexScanRule.h"

#include "graph/optimizer/OptContext.h"
#include "graph/optimizer/OptGroup.h"
#include "graph/optimizer/OptRule.h"
Expand Down
9 changes: 7 additions & 2 deletions src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ folly::Future<ExecutionResponse> GraphService::future_executeWithParameter(
ctx->setRunner(getThreadManager());
ctx->setSessionMgr(sessionManager_.get());
auto future = ctx->future();
stats::StatsManager::addValue(kNumQueries);
stats::StatsManager::addValue(kNumActiveQueries);
// When the sessionId is 0, it means the clients to ping the connection is ok
if (sessionId == 0) {
ctx->resp().errorCode = ErrorCode::E_SESSION_INVALID;
Expand All @@ -161,14 +159,21 @@ folly::Future<ExecutionResponse> GraphService::future_executeWithParameter(
return ctx->finish();
}
stats::StatsManager::addValue(kNumQueries);
stats::StatsManager::addValue(kNumActiveQueries);
if (FLAGS_enable_space_level_metrics && sessionPtr->space().name != "") {
stats::StatsManager::addValue(stats::StatsManager::counterWithLabels(
kNumQueries, {{"space", sessionPtr->space().name}}));
stats::StatsManager::addValue(stats::StatsManager::counterWithLabels(
kNumActiveQueries, {{"space", sessionPtr->space().name}}));
}
ctx->setSession(std::move(sessionPtr));
ctx->setParameterMap(parameterMap);
queryEngine_->execute(std::move(ctx));
stats::StatsManager::decValue(kNumActiveQueries);
if (FLAGS_enable_space_level_metrics && sessionPtr->space().name != "") {
stats::StatsManager::decValue(stats::StatsManager::counterWithLabels(
kNumActiveQueries, {{"space", sessionPtr->space().name}}));
}
};
sessionManager_->findSession(sessionId, getThreadManager()).thenValue(std::move(cb));
return future;
Expand Down
19 changes: 18 additions & 1 deletion src/graph/service/QueryInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,33 @@ void QueryInstance::execute() {

Status QueryInstance::validateAndOptimize() {
auto *rctx = qctx()->rctx();
auto &spaceName = rctx->session()->space().name;
VLOG(1) << "Parsing query: " << rctx->query();
auto result = GQLParser(qctx()).parse(rctx->query());
NG_RETURN_IF_ERROR(result);
sentence_ = std::move(result).value();
if (sentence_->kind() == Sentence::Kind::kSequential) {
size_t num = static_cast<const SequentialSentences *>(sentence_.get())->numSentences();
stats::StatsManager::addValue(kNumSentences, num);
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumSentences, {{"space", spaceName}}), num);
}
} else {
stats::StatsManager::addValue(kNumSentences);
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumSentences, {{"space", spaceName}}));
}
}

NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx()));
NG_RETURN_IF_ERROR(findBestPlan());
stats::StatsManager::addValue(kOptimizerLatencyUs, *(qctx_->plan()->optimizeTimeInUs()));
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(
stats::StatsManager::histoWithLabels(kOptimizerLatencyUs, {{"space", spaceName}}));
}

return Status::OK();
}
Expand Down Expand Up @@ -117,6 +130,7 @@ void QueryInstance::onFinish() {
void QueryInstance::onError(Status status) {
LOG(ERROR) << status;
auto *rctx = qctx()->rctx();
auto &spaceName = rctx->session()->space().name;
switch (status.code()) {
case Status::Code::kOk:
rctx->resp().errorCode = ErrorCode::SUCCEEDED;
Expand All @@ -135,6 +149,10 @@ void QueryInstance::onError(Status status) {
break;
case Status::Code::kLeaderChanged:
stats::StatsManager::addValue(kNumQueryErrorsLeaderChanges);
if (FLAGS_enable_space_level_metrics && spaceName != "") {
stats::StatsManager::addValue(stats::StatsManager::counterWithLabels(
kNumQueryErrorsLeaderChanges, {{"space", spaceName}}));
}
[[fallthrough]];
case Status::Code::kBalanced:
case Status::Code::kEdgeNotFound:
Expand All @@ -157,7 +175,6 @@ void QueryInstance::onError(Status status) {
rctx->resp().errorCode = ErrorCode::E_EXECUTION_ERROR;
break;
}
auto &spaceName = rctx->session()->space().name;
rctx->resp().spaceName = std::make_unique<std::string>(spaceName);
rctx->resp().errorMsg = std::make_unique<std::string>(status.toString());
auto latency = rctx->duration().elapsedInUSec();
Expand Down
9 changes: 9 additions & 0 deletions src/graph/session/ClientSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ void ClientSession::markQueryKilled(nebula::ExecutionPlanID epId) {
}
context->second->markKilled();
stats::StatsManager::addValue(kNumKilledQueries);
if (FLAGS_enable_space_level_metrics && space_.name != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumKilledQueries, {{"space", space_.name}}));
}
VLOG(1) << "Mark query killed in local cache, epId: " << epId;

auto query = session_.queries_ref()->find(epId);
Expand All @@ -95,6 +99,11 @@ void ClientSession::markAllQueryKilled() {
session_.queries_ref()->clear();
}
stats::StatsManager::addValue(kNumKilledQueries, contexts_.size());
if (FLAGS_enable_space_level_metrics && space_.name != "") {
stats::StatsManager::addValue(
stats::StatsManager::counterWithLabels(kNumKilledQueries, {{"space", space_.name}}),
contexts_.size());
}
}
} // namespace graph
} // namespace nebula
4 changes: 2 additions & 2 deletions src/graph/stats/GraphStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ stats::CounterId kNumSentences;
stats::CounterId kQueryLatencyUs;
stats::CounterId kSlowQueryLatencyUs;
stats::CounterId kNumKilledQueries;
stats::CounterId kNumOomQueries;

stats::CounterId kOptimizerLatencyUs;

stats::CounterId kNumAggregateExecutors;
stats::CounterId kNumSortExecutors;
stats::CounterId kNumIndexScanExecutors;
stats::CounterId kNumOomExecutors;

stats::CounterId kNumOpenedSessions;
stats::CounterId kNumAuthFailedSessions;
Expand All @@ -54,6 +54,7 @@ void initGraphStats() {
kSlowQueryLatencyUs = stats::StatsManager::registerHisto(
"slow_query_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kNumKilledQueries = stats::StatsManager::registerStats("num_killed_queries", "rate, sum");
kNumOomQueries = stats::StatsManager::registerStats("num_oom_queries", "rate, sum");

kOptimizerLatencyUs = stats::StatsManager::registerHisto(
"optimizer_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
Expand All @@ -63,7 +64,6 @@ void initGraphStats() {
kNumSortExecutors = stats::StatsManager::registerStats("num_sort_executors", "rate, sum");
kNumIndexScanExecutors =
stats::StatsManager::registerStats("num_indexscan_executors", "rate, sum");
kNumOomExecutors = stats::StatsManager::registerStats("num_oom_executors", "rate, sum");

kNumOpenedSessions = stats::StatsManager::registerStats("num_opened_sessions", "rate, sum");
kNumAuthFailedSessions =
Expand Down
2 changes: 1 addition & 1 deletion src/graph/stats/GraphStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ extern stats::CounterId kNumSentences;
extern stats::CounterId kQueryLatencyUs;
extern stats::CounterId kSlowQueryLatencyUs;
extern stats::CounterId kNumKilledQueries;
extern stats::CounterId kNumOomQueries;

extern stats::CounterId kOptimizerLatencyUs;

// Executor
extern stats::CounterId kNumAggregateExecutors;
extern stats::CounterId kNumSortExecutors;
extern stats::CounterId kNumIndexScanExecutors;
extern stats::CounterId kNumOomExecutors;

// Server client traffic
// extern stats::CounterId kReceivedBytes;
Expand Down

0 comments on commit f37b131

Please sign in to comment.