Skip to content

Commit

Permalink
add arithmetic aggregate, add per-pool cpu usage (#2703)
Browse files Browse the repository at this point in the history
  • Loading branch information
adameat authored Mar 13, 2024
1 parent 6cb1176 commit 6f5a6b8
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 25 deletions.
14 changes: 14 additions & 0 deletions ydb/core/graph/api/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ struct TEvGraph {
metric->MutableHistogramBounds()->Add(bounds.begin(), bounds.end());
metric->MutableHistogramValues()->Add(values.begin(), values.end());;
}

void AddArithmeticMetric(const TString& name, double valueA, char op, double valueB) {
NKikimrGraph::TArithmeticMetric* metric = Record.AddArithmeticMetrics();
metric->SetName(name);
switch (op) {
case '/':
metric->SetOp(NKikimrGraph::TArithmeticMetric::EOP_DIVISION);
break;
default:
break;
}
metric->SetValueA(valueA);
metric->SetValueB(valueB);
}
};

struct TEvGetMetrics : TEventPB<TEvGetMetrics, NKikimrGraph::TEvGetMetrics, EvGetMetrics> {
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/graph/protos/graph.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@ message TMetric {
optional double Value = 2;
}

message TArithmeticMetric {
enum EOP {
EOP_UNKNOWN = 0;
EOP_DIVISION = 1;
}

string Name = 1;
EOP Op = 2;
double ValueA = 3;
double ValueB = 4;
}

message TEvSendMetrics {
repeated TMetric Metrics = 1;
optional uint64 Time = 2; // for testing purposes only
repeated THistogramMetric HistogramMetrics = 3;
repeated TArithmeticMetric ArithmeticMetrics = 4;
}

message TEvGetMetrics {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/graph/shard/backends.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ struct TMetricsData {
TInstant Timestamp;
std::unordered_map<TString, double> Values;
std::unordered_map<TString, std::map<ui64, ui64>> HistogramValues;

struct TArithmeticMetric {
char Op = 0;
double ValueA = 0;
double ValueB = 0;
};

std::unordered_map<TString, TArithmeticMetric> ArithmeticValues;
};

struct TAggregateSettings {
Expand Down
56 changes: 41 additions & 15 deletions ydb/core/graph/shard/shard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,54 @@ void TGraphShard::ApplyConfig(const NKikimrConfig::TGraphConfig& config) {
}
}

void TGraphShard::MergeHistogram(std::map<ui64, ui64>& dest, const NKikimrGraph::THistogramMetric& src) {
void TGraphShard::MergeHistogram(TMetricsData& data, const NKikimrGraph::THistogramMetric& src) {
std::map<ui64, ui64>& dest(data.HistogramValues[src.GetName()]);
size_t size(std::min(src.HistogramBoundsSize(), src.HistogramValuesSize()));
for (size_t n = 0; n < size; ++n) {
dest[src.GetHistogramBounds(n)] += src.GetHistogramValues(n);
}
}

void TGraphShard::AggregateHistograms(TMetricsData& data) {
void TGraphShard::MergeArithmetic(TMetricsData& data, const NKikimrGraph::TArithmeticMetric& src) {
auto& dest(data.ArithmeticValues[src.GetName()]);
switch (src.GetOp()) {
case NKikimrGraph::TArithmeticMetric::EOP_DIVISION:
dest.Op = '/';
dest.ValueA += src.GetValueA();
dest.ValueB += src.GetValueB();
break;
default:
break;
}
}

void TGraphShard::MergeMetrics(TMetricsData& data, const NKikimrGraph::TEvSendMetrics& src) {
for (const auto& metric : src.GetMetrics()) {
data.Values[metric.GetName()] += metric.GetValue(); // simple accumulation by name of metric
}
for (const auto& metric : src.GetHistogramMetrics()) {
MergeHistogram(data, metric);
}
for (const auto& metric : src.GetArithmeticMetrics()) {
MergeArithmetic(data, metric);
}
}

void TGraphShard::AggregateMetrics(TMetricsData& data) {
for (const auto& [name, hist] : data.HistogramValues) {
AggregateHistogram(data.Values, name, hist);
}
data.HistogramValues.clear();
for (const auto& [name, arithm] : data.ArithmeticValues) {
switch (arithm.Op) {
case '/':
data.Values[name] = arithm.ValueA / arithm.ValueB;
break;
default:
break;
}
}
data.ArithmeticValues.clear();
}

void TGraphShard::AggregateHistogram(std::unordered_map<TString, double>& values, const TString& name, const std::map<ui64, ui64>& histogram) {
Expand Down Expand Up @@ -157,12 +193,7 @@ void TGraphShard::Handle(TEvGraph::TEvSendMetrics::TPtr& ev) {
if (ev->Get()->Record.HasTime()) { // direct insertion
TMetricsData data;
data.Timestamp = TInstant::Seconds(ev->Get()->Record.GetTime());
for (const auto& metric : ev->Get()->Record.GetMetrics()) {
data.Values[metric.GetName()] = metric.GetValue();
}
for (const auto& metric : ev->Get()->Record.GetHistogramMetrics()) {
MergeHistogram(data.HistogramValues[metric.GetName()], metric);
}
MergeMetrics(data, ev->Get()->Record);
BLOG_TRACE("Executing direct TxStoreMetrics");
ExecuteTxStoreMetrics(std::move(data));
return;
Expand All @@ -187,13 +218,8 @@ void TGraphShard::Handle(TEvGraph::TEvSendMetrics::TPtr& ev) {
}
}
}
// aggregation
for (const auto& metric : ev->Get()->Record.GetMetrics()) {
MetricsData.Values[metric.GetName()] += metric.GetValue(); // simple accumulation by name of metric
}
for (auto& histMetric : ev->Get()->Record.GetHistogramMetrics()) {
MergeHistogram(MetricsData.HistogramValues[histMetric.GetName()], histMetric);
}

MergeMetrics(MetricsData, ev->Get()->Record);
}

void TGraphShard::Handle(TEvGraph::TEvGetMetrics::TPtr& ev) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/graph/shard/shard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ class TGraphShard : public TActor<TGraphShard>, public NTabletFlatExecutor::TTab
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) override;
void OnReadyToWork();
void ApplyConfig(const NKikimrConfig::TGraphConfig& config);
static void MergeHistogram(std::map<ui64, ui64>& dest, const NKikimrGraph::THistogramMetric& src);
static void AggregateHistograms(TMetricsData& data);
static void MergeMetrics(TMetricsData& data, const NKikimrGraph::TEvSendMetrics& src);
static void MergeHistogram(TMetricsData& data, const NKikimrGraph::THistogramMetric& src);
static void MergeArithmetic(TMetricsData& data, const NKikimrGraph::TArithmeticMetric& src);
static void AggregateMetrics(TMetricsData& data);
static void AggregateHistogram(std::unordered_map<TString, double>& values, const TString& name, const std::map<ui64, ui64>& histogram);

void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/graph/shard/tx_store_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TTxStoreMetrics : public TTransactionBase<TGraphShard> {
};

void TGraphShard::ExecuteTxStoreMetrics(TMetricsData&& data) {
AggregateHistograms(data);
AggregateMetrics(data);
switch (BackendType) {
case EBackendType::Memory:
MemoryBackend.StoreMetrics(std::move(data));
Expand Down
20 changes: 13 additions & 7 deletions ydb/core/sys_view/service/ext_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,27 @@ class TExtCountersUpdaterActor
if (!Config.Pools.empty()) {
double cpuUsage = 0;
for (size_t i = 0; i < Config.Pools.size(); ++i) {
double usedCore = 0;
double limitCore = 0;
if (PoolElapsedMicrosec[i]) {
auto elapsedMs = PoolElapsedMicrosec[i]->Val();
double usedCore = elapsedMs / 10000.;
CpuUsedCorePercents[i]->Set(usedCore);
CpuUsedCorePercents[i]->Set(elapsedMs / 10000.);
if (PoolElapsedMicrosecPrevValue[i] != 0) {
cpuUsage += (elapsedMs - PoolElapsedMicrosecPrevValue[i]) / 1000000.;
usedCore = (elapsedMs - PoolElapsedMicrosecPrevValue[i]) / 1000000.;
cpuUsage += usedCore;
}
PoolElapsedMicrosecPrevValue[i] = elapsedMs;
}
if (PoolCurrentThreadCount[i] && PoolCurrentThreadCount[i]->Val()) {
double limitCore = PoolCurrentThreadCount[i]->Val() * 100;
CpuLimitCorePercents[i]->Set(limitCore);
limitCore = PoolCurrentThreadCount[i]->Val();
CpuLimitCorePercents[i]->Set(limitCore * 100);
} else {
double limitCore = Config.Pools[i].ThreadCount * 100;
CpuLimitCorePercents[i]->Set(limitCore);
limitCore = Config.Pools[i].ThreadCount * 100;
CpuLimitCorePercents[i]->Set(limitCore * 100);
}
if (limitCore > 0) {
metrics->AddArithmeticMetric(TStringBuilder() << "resources.cpu." << Config.Pools[i].Name << ".usage",
usedCore, '/', limitCore);
}
}
metrics->AddMetric("resources.cpu.usage", cpuUsage);
Expand Down

0 comments on commit 6f5a6b8

Please sign in to comment.