Skip to content

Commit

Permalink
[exporter/datadog] Send host metadata on change, refactor main host m…
Browse files Browse the repository at this point in the history
…etadata sending (#25145)

**Description:** 

- [\*/datadog] Bump opentelemetry-mapping-go/\* to v0.7.1
- [exporter/datadog] Use reporter for exporter host as well
  • Loading branch information
mx-psi authored Sep 5, 2023
1 parent 1fd40fb commit 5e25c8b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 21 deletions.
28 changes: 28 additions & 0 deletions .chloggen/mx-psi_inframetadata-base-reporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Host metadata for remote hosts is now reported on first sight or on change

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [25145]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Host metadata for remote hosts will only be sent for payloads with the datadog.host.use_as_metadata resource attribute.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 3 additions & 3 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (f *factory) createMetricsExporter(
if md.ResourceMetrics().Len() > 0 {
attrs = md.ResourceMetrics().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs)
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})

// Consume resources for host metadata
Expand Down Expand Up @@ -331,7 +331,7 @@ func (f *factory) createTracesExporter(
if td.ResourceSpans().Len() > 0 {
attrs = td.ResourceSpans().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs)
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
// Consume resources for host metadata
for i := 0; i < td.ResourceSpans().Len(); i++ {
Expand Down Expand Up @@ -402,7 +402,7 @@ func (f *factory) createLogsExporter(
pusher = func(_ context.Context, td plog.Logs) error {
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs)
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs, metadataReporter)
})
for i := 0; i < td.ResourceLogs().Len(); i++ {
res := td.ResourceLogs().At(i).Resource()
Expand Down
20 changes: 7 additions & 13 deletions exporter/datadogexporter/internal/hostmetadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,11 @@ func NewPusher(params exporter.CreateSettings, pcfg PusherConfig) inframetadata.

// RunPusher to push host metadata payloads from the host where the Collector is running periodically to Datadog intake.
// This function is blocking and it is meant to be run on a goroutine.
func RunPusher(ctx context.Context, params exporter.CreateSettings, pcfg PusherConfig, p source.Provider, attrs pcommon.Map) {
func RunPusher(ctx context.Context, params exporter.CreateSettings, pcfg PusherConfig, p source.Provider, attrs pcommon.Map, reporter *inframetadata.Reporter) {
// Push metadata every 30 minutes
ticker := time.NewTicker(30 * time.Minute)
defer ticker.Stop()
defer params.Logger.Debug("Shut down host metadata routine")
pusher := NewPusher(params, pcfg)

// Get host metadata from resources and fill missing info using our exporter.
// Currently we only retrieve it once but still send the same payload
Expand All @@ -169,23 +168,18 @@ func RunPusher(ctx context.Context, params exporter.CreateSettings, pcfg PusherC
hostMetadata = metadataFromAttributes(attrs)
}
fillHostMetadata(params, pcfg, p, &hostMetadata)

// Run one first time at startup
if err := pusher.Push(ctx, hostMetadata); err != nil {
params.Logger.Warn("Initial host metadata failed", zap.Error(err))
} else {
params.Logger.Info("Sent initial host metadata")
// Consume one first time
if err := reporter.ConsumeHostMetadata(hostMetadata); err != nil {
params.Logger.Warn("Failed to consume host metadata", zap.Any("payload", hostMetadata))
}

for {
select {
case <-ctx.Done():
return
case <-ticker.C: // Send host metadata
if err := pusher.Push(ctx, hostMetadata); err != nil {
params.Logger.Warn("Sending host metadata failed", zap.Error(err))
} else {
params.Logger.Info("Sent host metadata")
case <-ticker.C:
if err := reporter.ConsumeHostMetadata(hostMetadata); err != nil {
params.Logger.Warn("Failed to consume host metadata", zap.Any("payload", hostMetadata))
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions exporter/datadogexporter/internal/hostmetadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"net/http/httptest"
"os"
"testing"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/azure"
Expand All @@ -23,6 +25,7 @@ import (
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/testutil"
)
Expand Down Expand Up @@ -229,7 +232,7 @@ func TestPusher(t *testing.T) {
params := exportertest.NewNopCreateSettings()
params.BuildInfo = mockBuildInfo

hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "")
hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname")
require.NoError(t, err)

attrs := testutil.NewAttributeMap(map[string]string{
Expand All @@ -242,7 +245,11 @@ func TestPusher(t *testing.T) {
defer server.Close()
pcfg.MetricsEndpoint = server.URL

go RunPusher(ctx, params, pcfg, hostProvider, attrs)
pusher := NewPusher(mockExporterCreateSettings, pcfg)
reporter, err := inframetadata.NewReporter(zap.NewNop(), pusher, 1*time.Second)
require.NoError(t, err)

go RunPusher(ctx, params, pcfg, hostProvider, attrs, reporter)

body := <-server.MetadataChan
var recvMetadata payload.HostMetadata
Expand Down
2 changes: 1 addition & 1 deletion exporter/datadogexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (exp *logsExporter) consumeLogs(_ context.Context, ld plog.Logs) (err error
if ld.ResourceLogs().Len() > 0 {
attrs = ld.ResourceLogs().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs)
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs, exp.metadataReporter)
})

// Consume resources for host metadata
Expand Down
2 changes: 1 addition & 1 deletion exporter/datadogexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metr
if md.ResourceMetrics().Len() > 0 {
attrs = md.ResourceMetrics().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs)
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs, exp.metadataReporter)
})

// Consume resources for host metadata
Expand Down
2 changes: 1 addition & 1 deletion exporter/datadogexporter/traces_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (exp *traceExporter) consumeTraces(
if td.ResourceSpans().Len() > 0 {
attrs = td.ResourceSpans().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs)
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs, exp.metadataReporter)
})

// Consume resources for host metadata
Expand Down

0 comments on commit 5e25c8b

Please sign in to comment.