Skip to content

Commit

Permalink
Q stable ydb 24 2 2024 07 12 decompressor stats (ydb-platform#6620)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored and uzhastik committed Sep 11, 2024
1 parent 142702a commit ed2488b
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 21 deletions.
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/actors/pending_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
if (Monitoring) {
Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"),
"fetcher", "Pending Fetcher", false, TActivationContext::ActorSystem(), SelfId());
Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"),
"local_worker_manager", "Local Worker Manager", false, TActivationContext::ActorSystem(), NYql::NDqs::MakeWorkerManagerActorID(SelfId().NodeId()));
}

Become(&TPendingFetcher::StateFunc);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct TTotalStatistics {
TAggregate ResultBytes;
TAggregate ResultRows;
TAggregate IngressBytes;
TAggregate IngressDecompressedBytes;
TAggregate IngressRows;
TAggregate EgressBytes;
TAggregate EgressRows;
Expand Down Expand Up @@ -202,6 +203,8 @@ void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const T
totals.ResultRows.Add(*sum);
} else if (name == "IngressBytes") {
totals.IngressBytes.Add(*sum);
} else if (name == "IngressDecompressedBytes") {
totals.IngressDecompressedBytes.Add(*sum);
} else if (name == "IngressRows") {
totals.IngressRows.Add(*sum);
} else if (name == "EgressBytes") {
Expand Down Expand Up @@ -371,6 +374,7 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
totals.ResultBytes.Write(writer, "ResultBytes");
totals.ResultRows.Write(writer, "ResultRows");
totals.IngressBytes.Write(writer, "IngressBytes");
totals.IngressDecompressedBytes.Write(writer, "IngressDecompressedBytes");
totals.IngressRows.Write(writer, "IngressRows");
totals.EgressBytes.Write(writer, "EgressBytes");
totals.EgressRows.Write(writer, "EgressRows");
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ using namespace NYql::NDq;

void TAsyncStats::Resize(ui32 taskCount) {
Bytes.resize(taskCount);
DecompressedBytes.resize(taskCount);
Rows.resize(taskCount);
Chunks.resize(taskCount);
Splits.resize(taskCount);
Expand Down Expand Up @@ -49,6 +50,7 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
ResultBytes.resize(taskCount);
IngressRows.resize(taskCount);
IngressBytes.resize(taskCount);
IngressDecompressedBytes.resize(taskCount);
EgressRows.resize(taskCount);
EgressBytes.resize(taskCount);

Expand All @@ -74,6 +76,7 @@ void SetNonZero(ui64& target, ui64 source) {

void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes());
SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes());
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());
Expand Down Expand Up @@ -117,6 +120,7 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
SetNonZero(IngressDecompressedBytes[index], taskStats.GetIngressDecompressedBytes());
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());

Expand Down Expand Up @@ -208,6 +212,7 @@ void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {

struct TAsyncGroupStat {
ui64 Bytes = 0;
ui64 DecompressedBytes = 0;
ui64 Rows = 0;
ui64 Chunks = 0;
ui64 Splits = 0;
Expand All @@ -222,6 +227,7 @@ struct TAsyncGroupStat {

void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
UpdateAggr(asyncAggr.MutableBytes(), asyncStat.GetBytes());
UpdateAggr(asyncAggr.MutableDecompressedBytes(), asyncStat.GetDecompressedBytes());
UpdateAggr(asyncAggr.MutableRows(), asyncStat.GetRows());
UpdateAggr(asyncAggr.MutableChunks(), asyncStat.GetChunks());
UpdateAggr(asyncAggr.MutableSplits(), asyncStat.GetSplits());
Expand Down Expand Up @@ -355,6 +361,7 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
UpdateAggr(stageStats->MutableResultBytes(), task.GetResultBytes());
UpdateAggr(stageStats->MutableIngressRows(), task.GetIngressRows());
UpdateAggr(stageStats->MutableIngressBytes(), task.GetIngressBytes());
UpdateAggr(stageStats->MutableIngressDecompressedBytes(), task.GetIngressDecompressedBytes());
UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows());
UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes());

Expand Down Expand Up @@ -713,6 +720,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportAggStats(p.second.ResultBytes, *stageStats.MutableResultBytes());
ExportAggStats(p.second.IngressRows, *stageStats.MutableIngressRows());
ExportAggStats(p.second.IngressBytes, *stageStats.MutableIngressBytes());
ExportAggStats(p.second.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows());
ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes());

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
struct TAsyncStats {
// Data
std::vector<ui64> Bytes;
std::vector<ui64> DecompressedBytes;
std::vector<ui64> Rows;
std::vector<ui64> Chunks;
std::vector<ui64> Splits;
Expand Down Expand Up @@ -81,6 +82,7 @@ struct TStageExecutionStats {
std::vector<ui64> ResultBytes;
std::vector<ui64> IngressRows;
std::vector<ui64> IngressBytes;
std::vector<ui64> IngressDecompressedBytes;
std::vector<ui64> EgressRows;
std::vector<ui64> EgressBytes;

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,9 @@ void FillAsyncAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqAsyncSt
if (asyncAggr.HasBytes()) {
FillAggrStat(node, asyncAggr.GetBytes(), "Bytes");
}
if (asyncAggr.HasDecompressedBytes()) {
FillAggrStat(node, asyncAggr.GetDecompressedBytes(), "DecompressedBytes");
}
if (asyncAggr.HasRows()) {
FillAggrStat(node, asyncAggr.GetRows(), "Rows");
}
Expand Down Expand Up @@ -2054,6 +2057,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
SetNonZero(node, "ResultBytes", taskStats.GetResultBytes());
SetNonZero(node, "IngressRows", taskStats.GetIngressRows());
SetNonZero(node, "IngressBytes", taskStats.GetIngressBytes());
SetNonZero(node, "IngressDecompressedBytes", taskStats.GetIngressDecompressedBytes());
SetNonZero(node, "EgressRows", taskStats.GetEgressRows());
SetNonZero(node, "EgressBytes", taskStats.GetEgressBytes());

Expand Down Expand Up @@ -2150,6 +2154,9 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
if ((*stat)->HasIngressBytes()) {
FillAggrStat(stats, (*stat)->GetIngressBytes(), "IngressBytes");
}
if ((*stat)->HasIngressDecompressedBytes()) {
FillAggrStat(stats, (*stat)->GetIngressDecompressedBytes(), "IngressDecompressedBytes");
}
if ((*stat)->HasEgressRows()) {
FillAggrStat(stats, (*stat)->GetEgressRows(), "EgressRows");
}
Expand Down
Loading

0 comments on commit ed2488b

Please sign in to comment.