Skip to content

Commit

Permalink
[chore] [receiver/datadog] Add support for v2 series (#34180)
Browse files Browse the repository at this point in the history
**Description:**
This PR adds support for Datadog V2 series.

Follow up of #33631 and #33957.

The full version of the code can be found in the
`cedwards/datadog-metrics-receiver-full` branch, or in Grafana Alloy:
https://github.com/grafana/alloy/tree/main/internal/etc/datadogreceiver

**Link to tracking Issue:** 
#18278 

**Testing:** 
Unit tests, as well as an end-to-end test, have been added.
  • Loading branch information
carrieedwards authored Aug 13, 2024
1 parent c0ffc7b commit 55fdd84
Show file tree
Hide file tree
Showing 13 changed files with 565 additions and 89 deletions.
1 change: 1 addition & 0 deletions receiver/datadogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datado
go 1.21.0

require (
github.com/DataDog/agent-payload/v5 v5.0.124
github.com/DataDog/datadog-agent/pkg/proto v0.56.0-rc.9
github.com/DataDog/datadog-api-client-go/v2 v2.28.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.107.0
Expand Down
2 changes: 2 additions & 0 deletions receiver/datadogreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions receiver/datadogreceiver/internal/translator/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ var metricTypeMap = map[string]pmetric.MetricType{
}

func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) dimensions {
resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool)
attrs := tagsToAttributes(tags, host, stringPool)
return dimensions{
name: name,
metricType: metricTypeMap[metricType],
buildInfo: version,
resourceAttrs: resourceAttrs,
scopeAttrs: scopeAttrs,
dpAttrs: dpAttrs,
resourceAttrs: attrs.resource,
scopeAttrs: attrs.scope,
dpAttrs: attrs.dp,
}
}

Expand Down
74 changes: 32 additions & 42 deletions receiver/datadogreceiver/internal/translator/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,25 @@ func TestMetricBatcher(t *testing.T) {
},
expect: func(t *testing.T, result pmetric.Metrics) {
// Different hosts should result in different ResourceMetrics
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 2, result.ResourceMetrics().Len())
resource1 := result.ResourceMetrics().At(0)
resource2 := result.ResourceMetrics().At(1)
v, exists := resource1.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = resource2.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host2", v.AsString())

res1ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource)

res2ExpectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host2", newStringPool())
requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource)

require.Equal(t, 1, resource1.ScopeMetrics().Len())
require.Equal(t, 1, resource2.ScopeMetrics().Len())

require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name())
requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
},
},
{
Expand Down Expand Up @@ -98,18 +99,19 @@ func TestMetricBatcher(t *testing.T) {
expect: func(t *testing.T, result pmetric.Metrics) {
// The different metrics will fall under the same ResourceMetric and ScopeMetric
// and there will be separate metrics under the ScopeMetric.Metrics()
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 1, result.ResourceMetrics().Len())
resource := result.ResourceMetrics().At(0)

v, exists := resource.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
expectedAttrs := tagsToAttributes([]string{"env:tag1", "service:test1", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)

require.Equal(t, 1, resource.ScopeMetrics().Len())

require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, "TestCount1", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestCount2", resource.ScopeMetrics().At(0).Metrics().At(1).Name())
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestCount2", 1)
},
},
{
Expand Down Expand Up @@ -142,21 +144,16 @@ func TestMetricBatcher(t *testing.T) {
},
expect: func(t *testing.T, result pmetric.Metrics) {
// Differences in attribute values should result in different resourceMetrics
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 2, result.ResourceMetrics().Len())
resource1 := result.ResourceMetrics().At(0)
resource2 := result.ResourceMetrics().At(1)
v, exists := resource1.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = resource2.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
v, exists = resource1.Resource().Attributes().Get("deployment.environment")
require.True(t, exists)
require.Equal(t, "dev", v.AsString())
v, exists = resource2.Resource().Attributes().Get("deployment.environment")
require.True(t, exists)
require.Equal(t, "prod", v.AsString())

res1ExpectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource1.Resource().Attributes(), res1ExpectedAttrs.resource)

res2ExpectedAttrs := tagsToAttributes([]string{"env:prod", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource2.Resource().Attributes(), res2ExpectedAttrs.resource)

require.Equal(t, 1, resource1.ScopeMetrics().Len())
require.Equal(t, 1, resource1.ScopeMetrics().Len())
Expand All @@ -167,8 +164,8 @@ func TestMetricBatcher(t *testing.T) {
require.Equal(t, 1, resource1.ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, 1, resource2.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestCount1", resource1.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestCount1", resource2.ScopeMetrics().At(0).Metrics().At(0).Name())
requireSum(t, resource1.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
requireSum(t, resource2.ScopeMetrics().At(0).Metrics().At(0), "TestCount1", 1)
},
},
{
Expand Down Expand Up @@ -203,22 +200,20 @@ func TestMetricBatcher(t *testing.T) {
// The different metrics will fall under the same ResourceMetric and ScopeMetric
// and there will be separate metrics under the ScopeMetric.Metrics() due to the different
// data types
requireMetricAndDataPointCounts(t, result, 2, 2)
require.Equal(t, 1, result.ResourceMetrics().Len())
resource := result.ResourceMetrics().At(0)

v, exists := resource.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)

require.Equal(t, 1, resource.ScopeMetrics().Len())

require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name())
require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(1).Name())

require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type())
require.Equal(t, pmetric.MetricTypeGauge, resource.ScopeMetrics().At(0).Metrics().At(1).Type())
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 1)
requireGauge(t, resource.ScopeMetrics().At(0).Metrics().At(1), "TestMetric", 1)
},
},
{
Expand Down Expand Up @@ -253,21 +248,16 @@ func TestMetricBatcher(t *testing.T) {
// Same host, tags, and metric name but two different datapoints
// should result in a single resourceMetric, scopeMetric, and metric
// but two different datapoints under that metric
requireMetricAndDataPointCounts(t, result, 1, 2)
require.Equal(t, 1, result.ResourceMetrics().Len())
resource := result.ResourceMetrics().At(0)

v, exists := resource.Resource().Attributes().Get("host.name")
require.True(t, exists)
require.Equal(t, "Host1", v.AsString())
expectedAttrs := tagsToAttributes([]string{"env:dev", "version:tag1"}, "Host1", newStringPool())
requireResourceAttributes(t, resource.Resource().Attributes(), expectedAttrs.resource)

require.Equal(t, 1, resource.ScopeMetrics().Len())

require.Equal(t, 1, resource.ScopeMetrics().At(0).Metrics().Len())

require.Equal(t, "TestMetric", resource.ScopeMetrics().At(0).Metrics().At(0).Name())

require.Equal(t, pmetric.MetricTypeSum, resource.ScopeMetrics().At(0).Metrics().At(0).Type())
require.Equal(t, 2, resource.ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len())
requireSum(t, resource.ScopeMetrics().At(0).Metrics().At(0), "TestMetric", 2)
},
},
}
Expand Down
85 changes: 85 additions & 0 deletions receiver/datadogreceiver/internal/translator/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"io"
"net/http"
"strings"
"time"

"github.com/DataDog/agent-payload/v5/gogen"
datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -23,6 +27,22 @@ type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

// TODO: add handling for JSON format in additional to protobuf?
func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gogen.MetricPayload_MetricSeries, err error) {
buf := GetBuffer()
defer PutBuffer(buf)
if _, err := io.Copy(buf, req.Body); err != nil {
return mp, err
}

pl := new(gogen.MetricPayload)
if err := pl.Unmarshal(buf.Bytes()); err != nil {
return mp, err
}

return pl.GetSeries(), nil
}

func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metrics {
bt := newBatcher()

Expand Down Expand Up @@ -87,3 +107,68 @@ func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metric
}
return bt.Metrics
}

func (mt *MetricsTranslator) TranslateSeriesV2(series []*gogen.MetricPayload_MetricSeries) pmetric.Metrics {
bt := newBatcher()

for _, serie := range series {
var dps pmetric.NumberDataPointSlice

// The V2 payload stores the host name under in the Resources field
resourceMap := getV2Resources(serie.Resources)
// TODO(jesus.vazquez) (Do this with string interning)
dimensions := parseSeriesProperties(serie.Metric, strings.ToLower(serie.Type.String()), serie.Tags, resourceMap["host"], mt.buildInfo.Version, mt.stringPool)
for k, v := range resourceMap {
if k == "host" {
continue // Host has already been added as a resource attribute in parseSeriesProperties(), so avoid duplicating that attribute
}
dimensions.resourceAttrs.PutStr(k, v)
}
dimensions.resourceAttrs.PutStr("source", serie.SourceTypeName) //TODO: check if this is correct handling of SourceTypeName field
metric, metricID := bt.Lookup(dimensions)

switch serie.Type {
case gogen.MetricPayload_COUNT:
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.Sum().SetIsMonotonic(false) // See https://docs.datadoghq.com/metrics/types/?tab=count#definition
dps = metric.Sum().DataPoints()
case gogen.MetricPayload_GAUGE:
dps = metric.Gauge().DataPoints()
case gogen.MetricPayload_RATE:
metric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) //TODO: verify that this is always the case
dps = metric.Sum().DataPoints()
case gogen.MetricPayload_UNSPECIFIED:
// Type is unset/unspecified
continue
}

dps.EnsureCapacity(len(serie.Points))

for _, point := range serie.Points {
dp := dps.AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(point.Timestamp * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds
dimensions.dpAttrs.CopyTo(dp.Attributes()) // TODO(jesus.vazquez) Review this copy
val := point.Value
if serie.Type == gogen.MetricPayload_RATE && serie.Interval != 0 {
val *= float64(serie.Interval)
}
dp.SetDoubleValue(val)

stream := identity.OfStream(metricID, dp)
ts, ok := mt.streamHasTimestamp(stream)
if ok {
dp.SetStartTimestamp(ts)
}
mt.updateLastTsForStream(stream, dp.Timestamp())
}
}
return bt.Metrics
}

func getV2Resources(resources []*gogen.MetricPayload_Resource) map[string]string {
resourceMap := make(map[string]string)
for i := range resources {
resourceMap[resources[i].Type] = resources[i].Name
}
return resourceMap
}
Loading

0 comments on commit 55fdd84

Please sign in to comment.