Skip to content

Commit b2f257e

Browse files
authored
Support Min/Max aggregation for partitioned stats (i.e. First/Last message timestamps) (#18142)
1 parent 4c573ce commit b2f257e

File tree

2 files changed

+65
-9
lines changed

2 files changed

+65
-9
lines changed

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
148148
Resize(partCount);
149149
}
150150

151-
void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
151+
void TPartitionedStats::SetNonZeroAggSum(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
152152
if (value) {
153153
AFL_ENSURE(partIndex < Parts.size());
154154
auto& part = Parts[partIndex];
@@ -164,7 +164,43 @@ void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, b
164164
}
165165
}
166166

167-
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
167+
void TPartitionedStats::SetNonZeroAggMin(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
168+
if (value) {
169+
AFL_ENSURE(partIndex < Parts.size());
170+
auto& part = Parts[partIndex];
171+
AFL_ENSURE(taskIndex < part.size());
172+
part[taskIndex] = value;
173+
AFL_ENSURE(partIndex < Values.size());
174+
if (Values[partIndex] == 0 || value < Values[partIndex]) {
175+
// Min/Max is related to Parts[] only, Values[] should kepp count Sum as well
176+
Sum = Sum + value - Values[partIndex];
177+
Values[partIndex] = value;
178+
if (recordTimeSeries) {
179+
AppendHistory();
180+
}
181+
}
182+
}
183+
}
184+
185+
void TPartitionedStats::SetNonZeroAggMax(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
186+
if (value) {
187+
AFL_ENSURE(partIndex < Parts.size());
188+
auto& part = Parts[partIndex];
189+
AFL_ENSURE(taskIndex < part.size());
190+
part[taskIndex] = value;
191+
AFL_ENSURE(partIndex < Values.size());
192+
if (value > Values[partIndex]) {
193+
// Min/Max is related to Parts[] only, Values[] should kepp count Sum as well
194+
Sum = Sum + value - Values[partIndex];
195+
Values[partIndex] = value;
196+
if (recordTimeSeries) {
197+
AppendHistory();
198+
}
199+
}
200+
}
201+
}
202+
203+
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries, EPartitionedAggKind aggKind) {
168204
auto [it, inserted] = Indices.try_emplace(key);
169205
if (inserted) {
170206
it->second = Indices.size() - 1;
@@ -175,7 +211,18 @@ void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex,
175211
if (stats.Parts.size() < PartCount) {
176212
stats.ResizeByParts(PartCount, TaskCount);
177213
}
178-
stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
214+
215+
switch (aggKind) {
216+
case EPartitionedAggKind::PartitionedAggSum:
217+
stats.SetNonZeroAggSum(taskIndex, it->second, value, recordTimeSeries);
218+
break;
219+
case EPartitionedAggKind::PartitionedAggMin:
220+
stats.SetNonZeroAggMin(taskIndex, it->second, value, recordTimeSeries);
221+
break;
222+
case EPartitionedAggKind::PartitionedAggMax:
223+
stats.SetNonZeroAggMax(taskIndex, it->second, value, recordTimeSeries);
224+
break;
225+
}
179226
}
180227

181228
void TExternalStats::Resize(ui32 taskCount) {
@@ -524,13 +571,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
524571
for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
525572
auto key = partitionStat.GetPartitionId();
526573
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
527-
index, key, partitionStat.GetExternalRows(), false);
574+
index, key, partitionStat.GetExternalRows(), false, EPartitionedAggKind::PartitionedAggSum);
528575
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
529-
index, key, partitionStat.GetExternalBytes(), true);
576+
index, key, partitionStat.GetExternalBytes(), true, EPartitionedAggKind::PartitionedAggSum);
530577
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
531-
index, key, partitionStat.GetFirstMessageMs(), false);
578+
index, key, partitionStat.GetFirstMessageMs(), false, EPartitionedAggKind::PartitionedAggMin);
532579
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
533-
index, key, partitionStat.GetLastMessageMs(), false);
580+
index, key, partitionStat.GetLastMessageMs(), false, EPartitionedAggKind::PartitionedAggMax);
534581
}
535582
}
536583
}
@@ -798,6 +845,7 @@ void TQueryExecutionStats::Prepare() {
798845
auto [it, inserted] = StageStats.try_emplace(stageId);
799846
Y_ENSURE(inserted);
800847
it->second.StageId = stageId;
848+
it->second.SetHistorySampleCount(HistorySampleCount);
801849
}
802850
// connections
803851
for (auto& [_, stageStats] : StageStats) {

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,23 @@ struct TPartitionedStats : public TTimeSeriesStats {
5252

5353
void ResizeByTasks(ui32 taskCount);
5454
void ResizeByParts(ui32 partCount, ui32 taskCount);
55-
void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
55+
void SetNonZeroAggSum(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
56+
void SetNonZeroAggMin(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
57+
void SetNonZeroAggMax(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
58+
};
59+
60+
enum EPartitionedAggKind {
61+
PartitionedAggSum,
62+
PartitionedAggMin,
63+
PartitionedAggMax,
5664
};
5765

5866
struct TTimeMultiSeriesStats {
5967
std::unordered_map<TString, ui32> Indices;
6068
ui32 TaskCount = 0;
6169
ui32 PartCount = 0;
6270

63-
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
71+
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries, EPartitionedAggKind aggKind);
6472
};
6573

6674
struct TExternalStats : public TTimeMultiSeriesStats {

0 commit comments

Comments
 (0)