Skip to content

Commit

Permalink
kubeletstats receiver: do not break down metrics batch (#754)
Browse files Browse the repository at this point in the history
Do not break down metrics batch before sending to the downstream
  • Loading branch information
dmitryax authored Aug 19, 2020
1 parent 0a07047 commit 8183bd9
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 30 deletions.
95 changes: 95 additions & 0 deletions receiver/kubeletstatsreceiver/go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions receiver/kubeletstatsreceiver/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var ValidMetricGroups = map[MetricGroup]bool{
}

type metricDataAccumulator struct {
m []*consumerdata.MetricsData
m []consumerdata.MetricsData
metadata Metadata
logger *zap.Logger
metricGroupsToCollect map[MetricGroup]bool
Expand Down Expand Up @@ -151,7 +151,7 @@ func (a *metricDataAccumulator) accumulate(
}
}
}
a.m = append(a.m, &consumerdata.MetricsData{
a.m = append(a.m, consumerdata.MetricsData{
Resource: r,
Metrics: resourceMetrics,
})
Expand Down
2 changes: 1 addition & 1 deletion receiver/kubeletstatsreceiver/kubelet/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestMetadataErrorCases(t *testing.T) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)

var mds []*consumerdata.MetricsData
var mds []consumerdata.MetricsData
acc := metricDataAccumulator{
m: mds,
metadata: tt.metadata,
Expand Down
2 changes: 1 addition & 1 deletion receiver/kubeletstatsreceiver/kubelet/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func MetricsData(
metadata Metadata,
typeStr string,
metricGroupsToCollect map[MetricGroup]bool,
) []*consumerdata.MetricsData {
) []consumerdata.MetricsData {
acc := &metricDataAccumulator{
metadata: metadata,
logger: logger,
Expand Down
4 changes: 2 additions & 2 deletions receiver/kubeletstatsreceiver/kubelet/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestMetricAccumulator(t *testing.T) {
require.Equal(t, 0, len(MetricsData(zap.NewNop(), summary, metadata, "", map[MetricGroup]bool{})))
}

func requireMetricsDataOk(t *testing.T, mds []*consumerdata.MetricsData) {
func requireMetricsDataOk(t *testing.T, mds []consumerdata.MetricsData) {
for _, md := range mds {
requireResourceOk(t, md.Resource)
for _, metric := range md.Metrics {
Expand Down Expand Up @@ -161,7 +161,7 @@ func indexedFakeMetrics() map[string][]*metricspb.Metric {
return metrics
}

func fakeMetrics() []*consumerdata.MetricsData {
func fakeMetrics() []consumerdata.MetricsData {
rc := &fakeRestClient{}
statsProvider := NewStatsProvider(rc)
summary, _ := statsProvider.StatsSummary()
Expand Down
22 changes: 11 additions & 11 deletions receiver/kubeletstatsreceiver/runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,17 +87,18 @@ func (r *runnable) Run() error {

metadata := kubelet.NewMetadata(r.extraMetadataLabels, podsMetadata)
mds := kubelet.MetricsData(r.logger, summary, metadata, typeStr, r.metricGroupsToCollect)
metrics := pdatautil.MetricsFromMetricsData(mds)

var numTimeSeries, numPoints int
ctx := obsreport.ReceiverContext(r.ctx, typeStr, transport, r.receiverName)
for _, md := range mds {
ctx = obsreport.StartMetricsReceiveOp(ctx, typeStr, transport)
err = r.consumer.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{*md}))
var numTimeSeries, numPoints int
if err != nil {
r.logger.Error("ConsumeMetricsData failed", zap.Error(err))
} else {
numTimeSeries, numPoints = obsreport.CountMetricPoints(*md)
}
obsreport.EndMetricsReceiveOp(ctx, typeStr, numTimeSeries, numPoints, err)
ctx = obsreport.StartMetricsReceiveOp(ctx, typeStr, transport)
err = r.consumer.ConsumeMetrics(ctx, metrics)
if err != nil {
r.logger.Error("ConsumeMetricsData failed", zap.Error(err))
} else {
numTimeSeries, numPoints = pdatautil.MetricAndDataPointCount(metrics)
}
obsreport.EndMetricsReceiveOp(ctx, typeStr, numTimeSeries, numPoints, err)

return nil
}
32 changes: 19 additions & 13 deletions receiver/kubeletstatsreceiver/runnable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ import (
)

const (
dataLen = numContainers + numPods + numNodes + numVolumes
dataLen = numContainers*containerMetrics + numPods*podMetrics + numNodes*nodeMetrics + numVolumes*volumeMetrics

// Number of resources by type in testdata/stats-summary.json
numContainers = 9
numPods = 9
numNodes = 1
numVolumes = 9
numVolumes = 8

// Number of metrics by resource
nodeMetrics = 15
podMetrics = 15
containerMetrics = 11
volumeMetrics = 5
)

var allMetricGroups = map[kubelet.MetricGroup]bool{
Expand All @@ -63,7 +69,7 @@ func TestRunnable(t *testing.T) {
require.NoError(t, err)
err = r.Run()
require.NoError(t, err)
require.Equal(t, dataLen, len(consumer.AllMetrics()))
require.Equal(t, dataLen, consumer.MetricsCount())
}

func TestRunnableWithMetadata(t *testing.T) {
Expand All @@ -81,7 +87,7 @@ func TestRunnableWithMetadata(t *testing.T) {
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.ContainerMetricGroup: true,
},
dataLen: numContainers,
dataLen: numContainers * containerMetrics,
metricPrefix: "container.",
requiredLabel: "container.id",
},
Expand All @@ -91,7 +97,7 @@ func TestRunnableWithMetadata(t *testing.T) {
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.VolumeMetricGroup: true,
},
dataLen: numVolumes,
dataLen: numVolumes * volumeMetrics,
metricPrefix: "k8s.volume.",
requiredLabel: "k8s.volume.type",
},
Expand All @@ -115,7 +121,7 @@ func TestRunnableWithMetadata(t *testing.T) {
require.NoError(t, err)
err = r.Run()
require.NoError(t, err)
require.Equal(t, tt.dataLen, len(consumer.AllMetrics()))
require.Equal(t, tt.dataLen, consumer.MetricsCount())

for _, metrics := range consumer.AllMetrics() {
md := pdatautil.MetricsToMetricsData(metrics)[0]
Expand Down Expand Up @@ -147,36 +153,36 @@ func TestRunnableWithMetricGroups(t *testing.T) {
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.ContainerMetricGroup: true,
},
dataLen: numContainers,
dataLen: numContainers * containerMetrics,
},
{
name: "only pod group",
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.PodMetricGroup: true,
},
dataLen: numPods,
dataLen: numPods * podMetrics,
},
{
name: "only node group",
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.NodeMetricGroup: true,
},
dataLen: numNodes,
dataLen: numNodes * nodeMetrics,
},
{
name: "only volume group",
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.VolumeMetricGroup: true,
},
dataLen: numVolumes,
dataLen: numVolumes * volumeMetrics,
},
{
name: "pod and node groups",
metricGroups: map[kubelet.MetricGroup]bool{
kubelet.PodMetricGroup: true,
kubelet.NodeMetricGroup: true,
},
dataLen: numNodes + numPods,
dataLen: numNodes*nodeMetrics + numPods*podMetrics,
},
}
for _, test := range tests {
Expand All @@ -199,7 +205,7 @@ func TestRunnableWithMetricGroups(t *testing.T) {
err = r.Run()
require.NoError(t, err)

require.Equal(t, test.dataLen, len(consumer.AllMetrics()))
require.Equal(t, test.dataLen, consumer.MetricsCount())
})
}
}
Expand Down Expand Up @@ -279,7 +285,7 @@ func TestConsumerErrors(t *testing.T) {
numLogs int
}{
{"no error", false, 0},
{"consume error", true, dataLen},
{"consume error", true, 1},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions receiver/kubeletstatsreceiver/testdata/stats-summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
},
"memory": {
"time": "2020-04-20T22:52:23Z",
"availableBytes": 171589632,
"usageBytes": 33259520,
"workingSetBytes": 28827648,
"rssBytes": 29151232,
Expand All @@ -29,6 +30,7 @@
},
"memory": {
"time": "2020-04-20T22:52:27Z",
"availableBytes": 171589632,
"usageBytes": 1186713600,
"workingSetBytes": 300175360,
"rssBytes": 86695936,
Expand Down Expand Up @@ -139,6 +141,7 @@
},
"memory": {
"time": "2020-04-20T22:52:26Z",
"availableBytes": 171589632,
"usageBytes": 13701120,
"workingSetBytes": 11640832,
"rssBytes": 12660736,
Expand Down Expand Up @@ -172,6 +175,7 @@
},
"memory": {
"time": "2020-04-20T22:52:18Z",
"availableBytes": 171589632,
"usageBytes": 14290944,
"workingSetBytes": 12230656,
"rssBytes": 12660736,
Expand Down Expand Up @@ -230,6 +234,7 @@
},
"memory": {
"time": "2020-04-20T22:52:23Z",
"availableBytes": 171589632,
"usageBytes": 25088000,
"workingSetBytes": 25083904,
"rssBytes": 23523328,
Expand Down Expand Up @@ -263,6 +268,7 @@
},
"memory": {
"time": "2020-04-20T22:52:25Z",
"availableBytes": 171589632,
"usageBytes": 25726976,
"workingSetBytes": 25722880,
"rssBytes": 23523328,
Expand Down Expand Up @@ -337,6 +343,7 @@
},
"memory": {
"time": "2020-04-20T22:52:20Z",
"availableBytes": 171589632,
"usageBytes": 268869632,
"workingSetBytes": 243318784,
"rssBytes": 266645504,
Expand Down Expand Up @@ -370,6 +377,7 @@
},
"memory": {
"time": "2020-04-20T22:52:24Z",
"availableBytes": 171589632,
"usageBytes": 269459456,
"workingSetBytes": 243908608,
"rssBytes": 266645504,
Expand Down Expand Up @@ -658,6 +666,7 @@
},
"memory": {
"time": "2020-04-20T22:52:24Z",
"availableBytes": 171589632,
"usageBytes": 38068224,
"workingSetBytes": 37040128,
"rssBytes": 36741120,
Expand Down Expand Up @@ -691,6 +700,7 @@
},
"memory": {
"time": "2020-04-20T22:52:21Z",
"availableBytes": 171589632,
"usageBytes": 38703104,
"workingSetBytes": 37675008,
"rssBytes": 36741120,
Expand Down Expand Up @@ -749,6 +759,7 @@
},
"memory": {
"time": "2020-04-20T22:52:22Z",
"availableBytes": 171589632,
"usageBytes": 9715712,
"workingSetBytes": 8736768,
"rssBytes": 8065024,
Expand Down Expand Up @@ -782,6 +793,7 @@
},
"memory": {
"time": "2020-04-20T22:52:14Z",
"availableBytes": 171589632,
"usageBytes": 10280960,
"workingSetBytes": 9302016,
"rssBytes": 8081408,
Expand Down Expand Up @@ -862,6 +874,7 @@
},
"memory": {
"time": "2020-04-20T22:52:23Z",
"availableBytes": 171589632,
"usageBytes": 13877248,
"workingSetBytes": 13877248,
"rssBytes": 12865536,
Expand Down Expand Up @@ -895,6 +908,7 @@
},
"memory": {
"time": "2020-04-20T22:52:15Z",
"availableBytes": 171589632,
"usageBytes": 14356480,
"workingSetBytes": 14356480,
"rssBytes": 12865536,
Expand Down Expand Up @@ -965,6 +979,7 @@
},
"memory": {
"time": "2020-04-20T22:52:17Z",
"availableBytes": 171589632,
"usageBytes": 38199296,
"workingSetBytes": 33415168,
"rssBytes": 31907840,
Expand Down Expand Up @@ -998,6 +1013,7 @@
},
"memory": {
"time": "2020-04-20T22:52:17Z",
"availableBytes": 171589632,
"usageBytes": 38768640,
"workingSetBytes": 33984512,
"rssBytes": 31907840,
Expand Down

0 comments on commit 8183bd9

Please sign in to comment.