Skip to content

Commit

Permalink
Batch the metrics with the same dimensions into the same EMF log requ…
Browse files Browse the repository at this point in the history
…est (#2317)
  • Loading branch information
mxiamxia authored Feb 22, 2021
1 parent f7c7ef2 commit 0761ee3
Show file tree
Hide file tree
Showing 13 changed files with 2,329 additions and 2,482 deletions.
27 changes: 17 additions & 10 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type rateKeyParams struct {
metricNameKey string
logGroupKey string
logStreamKey string
timestampKey string
labels label.Distinct
}

Expand Down Expand Up @@ -101,12 +102,12 @@ type DoubleSummaryDataPointSlice struct {
func (dps IntDataPointSlice) At(i int) DataPoint {
metric := dps.IntDataPointSlice.At(i)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
labels := createLabels(metric.LabelsMap())
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)

var metricVal float64
metricVal = float64(metric.Value())
if dps.needsCalculateRate {
sortedLabels := getSortedLabels(metric.LabelsMap())
sortedLabels := getSortedLabels(labels)
dps.rateKeyParams.labels = sortedLabels
rateKey := dps.rateKeyParams
rateTS := dps.timestampMs
Expand All @@ -127,13 +128,13 @@ func (dps IntDataPointSlice) At(i int) DataPoint {
// At retrieves the DoubleDataPoint at the given index and performs rate calculation if necessary.
func (dps DoubleDataPointSlice) At(i int) DataPoint {
metric := dps.DoubleDataPointSlice.At(i)
labels := createLabels(metric.LabelsMap())
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())

var metricVal float64
metricVal = metric.Value()
if dps.needsCalculateRate {
sortedLabels := getSortedLabels(metric.LabelsMap())
sortedLabels := getSortedLabels(labels)
dps.rateKeyParams.labels = sortedLabels
rateKey := dps.rateKeyParams
rateTS := dps.timestampMs
Expand All @@ -154,7 +155,7 @@ func (dps DoubleDataPointSlice) At(i int) DataPoint {
// At retrieves the DoubleHistogramDataPoint at the given index.
func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
metric := dps.DoubleHistogramDataPointSlice.At(i)
labels := createLabels(metric.LabelsMap())
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
timestamp := unixNanoToMilliseconds(metric.Timestamp())

return DataPoint{
Expand All @@ -170,7 +171,7 @@ func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
// At retrieves the DoubleSummaryDataPoint at the given index.
func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint {
metric := dps.DoubleSummaryDataPointSlice.At(i)
labels := createLabels(metric.LabelsMap())
labels := createLabels(metric.LabelsMap(), dps.instrumentationLibraryName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())

metricVal := &CWMetricStats{
Expand All @@ -190,22 +191,28 @@ func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint {
}

// createLabels converts OTel StringMap labels to a map
func createLabels(labelsMap pdata.StringMap) map[string]string {
// and optionally adds in the OTel instrumentation library name
func createLabels(labelsMap pdata.StringMap, instrLibName string) map[string]string {
labels := make(map[string]string, labelsMap.Len()+1)
labelsMap.ForEach(func(k, v string) {
labels[k] = v
})

// Add OTel instrumentation lib name as an additional label if it is defined
if instrLibName != noInstrumentationLibraryName {
labels[oTellibDimensionKey] = instrLibName
}

return labels
}

// getSortedLabels converts OTel StringMap labels to sorted labels as label.Distinct
func getSortedLabels(labelsMap pdata.StringMap) label.Distinct {
func getSortedLabels(labels map[string]string) label.Distinct {
var kvs []label.KeyValue
var sortable label.Sortable
labelsMap.ForEach(func(k, v string) {
for k, v := range labels {
kvs = append(kvs, label.String(k, v))
})
}
set := label.NewSetWithSortable(kvs, &sortable)

return set.Equivalent()
Expand Down
106 changes: 71 additions & 35 deletions exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package awsemfexporter

import (
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -242,7 +243,6 @@ func generateTestSummary(name string) *metricspb.Metric {
}

func TestIntDataPointSliceAt(t *testing.T) {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
instrLibName := "cloudwatch-otel"
labels := map[string]string{"label1": "value1"}
rateKeys := rateKeyParams{
Expand All @@ -256,25 +256,35 @@ func TestIntDataPointSliceAt(t *testing.T) {
testName string
needsCalculateRate bool
value interface{}
calculatedValue interface{}
}{
{
"no rate calculation",
false,
int64(-17),
float64(-17),
},
{
"w/ rate calculation",
"w/ 1st rate calculation",
true,
int64(1),
float64(0),
},
{
"w/ 2nd rate calculation",
true,
int64(2),
float64(1),
},
}

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
testDPS := pdata.NewIntDataPointSlice()
testDPS.Resize(1)
testDP := testDPS.At(0)
testDP.SetValue(int64(-17))
testDP.SetValue(tc.value.(int64))
testDP.LabelsMap().InitFromMap(labels)

dps := IntDataPointSlice{
Expand All @@ -288,21 +298,27 @@ func TestIntDataPointSliceAt(t *testing.T) {
}

expectedDP := DataPoint{
Value: tc.value,
Value: tc.calculatedValue,
Labels: map[string]string{
"label1": "value1",
oTellibDimensionKey: instrLibName,
"label1": "value1",
},
}

assert.Equal(t, 1, dps.Len())
dp := dps.At(0)
assert.Equal(t, expectedDP, dp)
if strings.Contains(tc.testName, "2nd rate") {
assert.True(t, (expectedDP.Value.(float64)-dp.Value.(float64)) < 0.01)
} else {
assert.Equal(t, expectedDP, dp)
}
// sleep 1s for verifying the cumulative metric delta rate
time.Sleep(1000 * time.Millisecond)
})
}
}

func TestDoubleDataPointSliceAt(t *testing.T) {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
instrLibName := "cloudwatch-otel"
labels := map[string]string{"label1": "value1"}
rateKeys := rateKeyParams{
Expand All @@ -316,25 +332,35 @@ func TestDoubleDataPointSliceAt(t *testing.T) {
testName string
needsCalculateRate bool
value interface{}
calculatedValue interface{}
}{
{
"no rate calculation",
false,
float64(0.3),
float64(0.3),
},
{
"w/ rate calculation",
"w/ 1st rate calculation",
true,
float64(0),
float64(0.4),
float64(0.0),
},
{
"w/ 2nd rate calculation",
true,
float64(0.5),
float64(0.1),
},
}

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
testDPS := pdata.NewDoubleDataPointSlice()
testDPS.Resize(1)
testDP := testDPS.At(0)
testDP.SetValue(float64(0.3))
testDP.SetValue(tc.value.(float64))
testDP.LabelsMap().InitFromMap(labels)

dps := DoubleDataPointSlice{
Expand All @@ -348,15 +374,22 @@ func TestDoubleDataPointSliceAt(t *testing.T) {
}

expectedDP := DataPoint{
Value: tc.value,
Value: tc.calculatedValue,
Labels: map[string]string{
"label1": "value1",
oTellibDimensionKey: instrLibName,
"label1": "value1",
},
}

assert.Equal(t, 1, dps.Len())
dp := dps.At(0)
assert.Equal(t, expectedDP, dp)
if strings.Contains(tc.testName, "2nd rate") {
assert.True(t, (expectedDP.Value.(float64)-dp.Value.(float64)) < 0.002)
} else {
assert.Equal(t, expectedDP, dp)
}
// sleep 10ms for verifying the cumulative metric delta rate
time.Sleep(1000 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -385,7 +418,8 @@ func TestDoubleHistogramDataPointSliceAt(t *testing.T) {
Count: 17,
},
Labels: map[string]string{
"label1": "value1",
oTellibDimensionKey: instrLibName,
"label1": "value1",
},
}

Expand Down Expand Up @@ -425,7 +459,8 @@ func TestDoubleSummaryDataPointSliceAt(t *testing.T) {
Sum: 17.13,
},
Labels: map[string]string{
"label1": "value1",
oTellibDimensionKey: instrLibName,
"label1": "value1",
},
}

Expand All @@ -442,11 +477,12 @@ func TestCreateLabels(t *testing.T) {
}
labelsMap := pdata.NewStringMap().InitFromMap(expectedLabels)

labels := createLabels(labelsMap)
labels := createLabels(labelsMap, noInstrumentationLibraryName)
assert.Equal(t, expectedLabels, labels)

// With isntrumentation library name
labels = createLabels(labelsMap)
labels = createLabels(labelsMap, "cloudwatch-otel")
expectedLabels[oTellibDimensionKey] = "cloudwatch-otel"
assert.Equal(t, expectedLabels, labels)
}

Expand Down Expand Up @@ -708,13 +744,13 @@ func BenchmarkGetDataPoints(b *testing.B) {
}

func TestGetSortedLabelsEquals(t *testing.T) {
labelMap1 := pdata.NewStringMap()
labelMap1.Insert("k1", "v1")
labelMap1.Insert("k2", "v2")
labelMap1 := make(map[string]string)
labelMap1["k1"] = "v1"
labelMap1["k2"] = "v2"

labelMap2 := pdata.NewStringMap()
labelMap2.Insert("k2", "v2")
labelMap2.Insert("k1", "v1")
labelMap2 := make(map[string]string)
labelMap2["k2"] = "v2"
labelMap2["k1"] = "v1"

sortedLabels1 := getSortedLabels(labelMap1)
sortedLabels2 := getSortedLabels(labelMap2)
Expand All @@ -737,13 +773,13 @@ func TestGetSortedLabelsEquals(t *testing.T) {
}

func TestGetSortedLabelsNotEqual(t *testing.T) {
labelMap1 := pdata.NewStringMap()
labelMap1.Insert("k1", "v1")
labelMap1.Insert("k2", "v2")
labelMap1 := make(map[string]string)
labelMap1["k1"] = "v1"
labelMap1["k2"] = "v2"

labelMap2 := pdata.NewStringMap()
labelMap2.Insert("k2", "v2")
labelMap2.Insert("k1", "v3")
labelMap2 := make(map[string]string)
labelMap2["k2"] = "v2"
labelMap2["k1"] = "v3"

sortedLabels1 := getSortedLabels(labelMap1)
sortedLabels2 := getSortedLabels(labelMap2)
Expand All @@ -766,13 +802,13 @@ func TestGetSortedLabelsNotEqual(t *testing.T) {
}

func TestGetSortedLabelsNotEqualOnPram(t *testing.T) {
labelMap1 := pdata.NewStringMap()
labelMap1.Insert("k1", "v1")
labelMap1.Insert("k2", "v2")
labelMap1 := make(map[string]string)
labelMap1["k1"] = "v1"
labelMap1["k2"] = "v2"

labelMap2 := pdata.NewStringMap()
labelMap2.Insert("k2", "v2")
labelMap2.Insert("k1", "v1")
labelMap2 := make(map[string]string)
labelMap2["k2"] = "v2"
labelMap2["k1"] = "v1"

sortedLabels1 := getSortedLabels(labelMap1)
sortedLabels2 := getSortedLabels(labelMap2)
Expand Down
Loading

0 comments on commit 0761ee3

Please sign in to comment.