Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
KqpGroup->GetHistogram("SinkWrites/BufferActorCommitLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));
BufferActorFlushLatencyHistogram =
KqpGroup->GetHistogram("SinkWrites/BufferActorFlushLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));

ForwardActorWritesSizeHistogram =
KqpGroup->GetHistogram("SinkWrites/ForwardActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1));
ForwardActorWritesLatencyHistogram =
Expand Down Expand Up @@ -876,6 +876,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co

TotalSingleNodeReqCount = KqpGroup->GetCounter("TotalSingleNodeReqCount", true);
NonLocalSingleNodeReqCount = KqpGroup->GetCounter("NonLocalSingleNodeReqCount", true);

/* Statistics performance */
QueryStatCpuCollectUs = KqpGroup->GetCounter("Query/Stat/CpuCollectUs", true);
QueryStatCpuFinishUs = KqpGroup->GetCounter("Query/Stat/CpuFinishUs", true);
QueryStatCpuConvertUs = KqpGroup->GetCounter("Query/Stat/CpuConvertUs", true);

QueryStatMemCollectInflightBytes = KqpGroup->GetCounter("Query/Stat/MemCollectInflightBytes", false);
QueryStatMemFinishInflightBytes = KqpGroup->GetCounter("Query/Stat/MemFinishInflightBytes", false);

QueryStatMemFinishBytes = KqpGroup->GetCounter("Query/Stat/MemFinishBytes", true);
QueryStatMemConvertBytes = KqpGroup->GetCounter("Query/Stat/MemConvertBytes", true);

}

::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
TConcurrentRWHashMap<TString, TKqpDbCountersPtr, 256> DbCounters;
TActorSystem* ActorSystem = nullptr;
TActorId DbWatcherActorId;

// Statistics CPU usage
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatCpuCollectUs;
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatCpuFinishUs;
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatCpuConvertUs;
// Statistics MEM inflight (non deriv)
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemCollectInflightBytes;
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemFinishInflightBytes;
// Statistics MEM output (deriv)
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemFinishBytes;
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemConvertBytes;

};

struct TKqpRequestCounters : public TThrRefBase {
Expand Down
69 changes: 57 additions & 12 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,21 +425,33 @@ class TKqpExecuterBase : public TActor<TDerived> {

YQL_ENSURE(Stats);

if (state.HasStats() && Request.ProgressStatsPeriod) {
if (state.HasStats()) {
ui64 cycleCount = GetCycleCountFast();

Stats->UpdateTaskStats(taskId, state.GetStats());
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
Stats->ExportExecStats(execStats);
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
execStats.AddTxPlansWithStats(planWithStats);
if (Request.ProgressStatsPeriod) {
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
Stats->ExportExecStats(execStats);
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
execStats.AddTxPlansWithStats(planWithStats);
}
this->Send(Target, progress.Release());
LastProgressStats = now;
}
this->Send(Target, progress.Release());
LastProgressStats = now;
}
auto collectBytes = Stats->EstimateCollectMem();
auto deltaCpuTime = NHPTimer::GetSeconds(GetCycleCountFast() - cycleCount);

Counters->Counters->QueryStatMemCollectInflightBytes->Add(
static_cast<i64>(collectBytes) - static_cast<i64>(StatCollectInflightBytes)
);
StatCollectInflightBytes = collectBytes;
Counters->Counters->QueryStatCpuCollectUs->Add(deltaCpuTime * 1'000'000);
}

YQL_ENSURE(Planner);
Expand All @@ -450,6 +462,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
// Don't finalize stats twice.
if (Planner->CompletedCA(taskId, computeActor)) {
ui64 cycleCount = GetCycleCountFast();

auto& extraData = ExtraData[computeActor];
extraData.TaskId = taskId;
extraData.Data.Swap(state.MutableExtraData());
Expand All @@ -462,6 +476,15 @@ class TKqpExecuterBase : public TActor<TDerived> {

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();

auto collectBytes = Stats->EstimateFinishMem();
auto deltaCpuTime = NHPTimer::GetSeconds(GetCycleCountFast() - cycleCount);

Counters->Counters->QueryStatMemFinishInflightBytes->Add(
static_cast<i64>(collectBytes) - static_cast<i64>(StatFinishInflightBytes)
);
StatFinishInflightBytes = collectBytes;
Counters->Counters->QueryStatCpuFinishUs->Add(deltaCpuTime * 1'000'000);
}
default:
; // ignore all other states.
Expand Down Expand Up @@ -1974,15 +1997,26 @@ class TKqpExecuterBase : public TActor<TDerived> {
ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();

Stats->Finish();

if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {

ui64 jsonSize = 0;
ui64 cycleCount = GetCycleCountFast();

response.MutableResult()->MutableStats()->ClearTxPlansWithStats();
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
jsonSize += planWithStats.size();
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}

auto deltaCpuTime = NHPTimer::GetSeconds(GetCycleCountFast() - cycleCount);
Counters->Counters->QueryStatCpuConvertUs->Add(deltaCpuTime * 1'000'000);
Counters->Counters->QueryStatMemConvertBytes->Add(jsonSize);
response.MutableResult()->MutableStats()->SetStatConvertBytes(jsonSize);
}

if (Stats->CollectStatsByLongTasks) {
Expand All @@ -1991,8 +2025,17 @@ class TKqpExecuterBase : public TActor<TDerived> {
LOG_I("Full stats: " << response.GetResult().GetStats());
}
}

auto finishSize = Stats->EstimateFinishMem();
Counters->Counters->QueryStatMemFinishBytes->Add(finishSize);
response.MutableResult()->MutableStats()->SetStatFinishBytes(finishSize);
}

Counters->Counters->QueryStatMemCollectInflightBytes->Sub(StatCollectInflightBytes);
StatCollectInflightBytes = 0;
Counters->Counters->QueryStatMemFinishInflightBytes->Sub(StatFinishInflightBytes);
StatFinishInflightBytes = 0;

Request.Transactions.crop(0);
this->Send(Target, ResponseEv.release());

Expand Down Expand Up @@ -2139,6 +2182,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
const bool VerboseMemoryLimitException;
TMaybe<ui8> ArrayBufferMinFillPercentage;

ui64 StatCollectInflightBytes = 0;
ui64 StatFinishInflightBytes = 0;
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
Expand Down
Loading
Loading