From b048815f00eeefa97c8a990e8ffd1f4b46924774 Mon Sep 17 00:00:00 2001 From: Stanley Liu Date: Wed, 29 May 2024 12:53:04 -0400 Subject: [PATCH 1/4] Refactor wg into traces and metrics --- exporter/datadogexporter/factory.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index c8840f5de19d..49659a4f5fe7 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -103,7 +103,8 @@ type factory struct { attributesTranslator *attributes.Translator attributesErr error - wg sync.WaitGroup // waits for agent to exit + tracesWG sync.WaitGroup // waits for agent to exit + metricsWG sync.WaitGroup // waits for consumeStatsPayload to exit registry *featuregate.Registry } @@ -155,9 +156,9 @@ func (f *factory) TraceAgent(ctx context.Context, params exporter.CreateSettings if err != nil { return nil, err } - f.wg.Add(1) + f.tracesWG.Add(1) go func() { - defer f.wg.Done() + defer f.tracesWG.Done() agnt.Run() }() return agnt, nil @@ -255,9 +256,9 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config { func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) { for i := 0; i < runtime.NumCPU(); i++ { - f.wg.Add(1) + f.metricsWG.Add(1) go func() { - defer f.wg.Done() + defer f.metricsWG.Done() for { select { case <-ctx.Done(): @@ -350,8 +351,8 @@ func (f *factory) createMetricsExporter( } else { exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, metadataReporter, statsIn) if metricsErr != nil { - cancel() // first cancel context - f.wg.Wait() // then wait for shutdown + cancel() // first cancel context + f.metricsWG.Wait() // then wait for shutdown return nil, metricsErr } pushMetricsFn = exp.PushMetricsDataScrubbed @@ -370,8 +371,8 @@ func (f *factory) createMetricsExporter( exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithShutdown(func(context.Context) error { - cancel() // first cancel context - f.wg.Wait() // then wait for shutdown + cancel() // first cancel context + f.metricsWG.Wait() // then wait for shutdown f.StopReporter() statsWriter.Stop() if statsIn != nil { @@ -462,7 +463,7 @@ func (f *factory) createTracesExporter( tracex, err2 := newTracesExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent, metadataReporter) if err2 != nil { cancel() - f.wg.Wait() // then wait for shutdown + f.tracesWG.Wait() // then wait for shutdown return nil, err2 } pusher = tracex.consumeTraces @@ -543,7 +544,6 @@ func (f *factory) createLogsExporter( exp, err := newLogsExporter(ctx, set, cfg, &f.onceMetadata, attributesTranslator, hostProvider, metadataReporter) if err != nil { cancel() - f.wg.Wait() // then wait for shutdown return nil, err } pusher = exp.consumeLogs From 486cd6ce82c40a0181a993c4b5fa99975c51461a Mon Sep 17 00:00:00 2001 From: Stanley Liu Date: Wed, 29 May 2024 13:42:33 -0400 Subject: [PATCH 2/4] Add changelog --- .chloggen/stanley.liu_refactor-wg.yaml | 27 ++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/stanley.liu_refactor-wg.yaml diff --git a/.chloggen/stanley.liu_refactor-wg.yaml b/.chloggen/stanley.liu_refactor-wg.yaml new file mode 100644 index 000000000000..352f501e6e10 --- /dev/null +++ b/.chloggen/stanley.liu_refactor-wg.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/datadog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixes a potential race condition when the traces exporter and metrics exporter are both shutting down. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33291] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 6b4bafece24f8faec4f7b96f9efb345c319efdb1 Mon Sep 17 00:00:00 2001 From: Stanley Liu Date: Wed, 29 May 2024 16:47:47 -0400 Subject: [PATCH 3/4] Refactor fields and add test --- exporter/datadogexporter/factory.go | 36 ++++++++-------- exporter/datadogexporter/factory_test.go | 54 ++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 49659a4f5fe7..f61a9dfa91a2 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -103,9 +103,6 @@ type factory struct { attributesTranslator *attributes.Translator attributesErr error - tracesWG sync.WaitGroup // waits for agent to exit - metricsWG sync.WaitGroup // waits for consumeStatsPayload to exit - registry *featuregate.Registry } @@ -151,14 +148,14 @@ func (f *factory) StopReporter() { }) } -func (f *factory) TraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) { +func (f *factory) TraceAgent(ctx context.Context, wg sync.WaitGroup, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) { agnt, err := newTraceAgent(ctx, params, cfg, sourceProvider, metricsclient.InitializeMetricClient(params.MeterProvider, metricsclient.ExporterSourceTag), attrsTranslator) if err != nil { return nil, err } - f.tracesWG.Add(1) + wg.Add(1) go func() { - defer f.tracesWG.Done() + defer wg.Done() agnt.Run() }() return agnt, nil @@ -254,11 +251,11 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config { return cfg } -func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) { +func (f *factory) consumeStatsPayload(ctx context.Context, wg sync.WaitGroup, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) { for i := 0; i < runtime.NumCPU(); i++ { - f.metricsWG.Add(1) + wg.Add(1) go func() { - defer f.metricsWG.Done() + defer wg.Done() for { select { case <-ctx.Done(): @@ -306,7 +303,11 @@ func (f *factory) createMetricsExporter( return nil, fmt.Errorf("failed to build attributes translator: %w", err) } - var pushMetricsFn consumer.ConsumeMetricsFunc + var ( + pushMetricsFn consumer.ConsumeMetricsFunc + wg sync.WaitGroup // waits for consumeStatsPayload to exit + ) + acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider, attrsTranslator) if err != nil { cancel() @@ -322,7 +323,7 @@ func (f *factory) createMetricsExporter( statsIn := make(chan []byte, 1000) statsv := set.BuildInfo.Command + set.BuildInfo.Version - f.consumeStatsPayload(ctx, statsIn, statsToAgent, statsv, acfg.AgentVersion, set.Logger) + f.consumeStatsPayload(ctx, wg, statsIn, statsToAgent, statsv, acfg.AgentVersion, set.Logger) pcfg := newMetadataConfigfromConfig(cfg) metadataReporter, err := f.Reporter(set, pcfg) if err != nil { @@ -351,8 +352,8 @@ func (f *factory) createMetricsExporter( } else { exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, metadataReporter, statsIn) if metricsErr != nil { - cancel() // first cancel context - f.metricsWG.Wait() // then wait for shutdown + cancel() // first cancel context + wg.Wait() // then wait for shutdown return nil, metricsErr } pushMetricsFn = exp.PushMetricsDataScrubbed @@ -371,8 +372,8 @@ func (f *factory) createMetricsExporter( exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithShutdown(func(context.Context) error { - cancel() // first cancel context - f.metricsWG.Wait() // then wait for shutdown + cancel() // first cancel context + wg.Wait() // then wait for shutdown f.StopReporter() statsWriter.Stop() if statsIn != nil { @@ -409,6 +410,7 @@ func (f *factory) createTracesExporter( var ( pusher consumer.ConsumeTracesFunc stop component.ShutdownFunc + wg sync.WaitGroup // waits for agent to exit ) hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname) @@ -424,7 +426,7 @@ func (f *factory) createTracesExporter( return nil, fmt.Errorf("failed to build attributes translator: %w", err) } - traceagent, err := f.TraceAgent(ctx, set, cfg, hostProvider, attrsTranslator) + traceagent, err := f.TraceAgent(ctx, wg, set, cfg, hostProvider, attrsTranslator) if err != nil { cancel() return nil, fmt.Errorf("failed to start trace-agent: %w", err) @@ -463,7 +465,7 @@ func (f *factory) createTracesExporter( tracex, err2 := newTracesExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent, metadataReporter) if err2 != nil { cancel() - f.tracesWG.Wait() // then wait for shutdown + wg.Wait() // then wait for shutdown return nil, err2 } pusher = tracex.consumeTraces diff --git a/exporter/datadogexporter/factory_test.go b/exporter/datadogexporter/factory_test.go index 7d8584c3d17d..1d391fadb8a8 100644 --- a/exporter/datadogexporter/factory_test.go +++ b/exporter/datadogexporter/factory_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload" @@ -738,3 +739,56 @@ func TestOnlyMetadata(t *testing.T) { recvMetadata := <-server.MetadataChan assert.Equal(t, recvMetadata.InternalHostname, "custom-hostname") } + +func TestStopExporters(t *testing.T) { + server := testutil.DatadogServerMock() + defer server.Close() + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "api").String()) + require.NoError(t, err) + require.NoError(t, component.UnmarshalConfig(sub, cfg)) + + c := cfg.(*Config) + c.Metrics.TCPAddrConfig.Endpoint = server.URL + c.HostMetadata.Enabled = false + + ctx := context.Background() + expTraces, err := factory.CreateTracesExporter( + ctx, + exportertest.NewNopCreateSettings(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, expTraces) + expMetrics, err := factory.CreateMetricsExporter( + ctx, + exportertest.NewNopCreateSettings(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, expMetrics) + + err = expTraces.Start(ctx, nil) + assert.NoError(t, err) + err = expMetrics.Start(ctx, nil) + assert.NoError(t, err) + + finishShutdown := make(chan bool) + go func() { + expMetrics.Shutdown(ctx) + expTraces.Shutdown(ctx) + finishShutdown <- true + }() + + select { + case <-finishShutdown: + break + case <-time.After(time.Second * 10): + t.Fatal("Timed out") + } +} From 14665bdab1216524bfbff8349b0d0d9ba1c020e3 Mon Sep 17 00:00:00 2001 From: Stanley Liu Date: Wed, 29 May 2024 16:55:10 -0400 Subject: [PATCH 4/4] Fix linter --- exporter/datadogexporter/factory.go | 8 ++++---- exporter/datadogexporter/factory_test.go | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index f61a9dfa91a2..816d73cb669d 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -148,7 +148,7 @@ func (f *factory) StopReporter() { }) } -func (f *factory) TraceAgent(ctx context.Context, wg sync.WaitGroup, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) { +func (f *factory) TraceAgent(ctx context.Context, wg *sync.WaitGroup, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider, attrsTranslator *attributes.Translator) (*agent.Agent, error) { agnt, err := newTraceAgent(ctx, params, cfg, sourceProvider, metricsclient.InitializeMetricClient(params.MeterProvider, metricsclient.ExporterSourceTag), attrsTranslator) if err != nil { return nil, err @@ -251,7 +251,7 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config { return cfg } -func (f *factory) consumeStatsPayload(ctx context.Context, wg sync.WaitGroup, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) { +func (f *factory) consumeStatsPayload(ctx context.Context, wg *sync.WaitGroup, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, agentVersion string, logger *zap.Logger) { for i := 0; i < runtime.NumCPU(); i++ { wg.Add(1) go func() { @@ -323,7 +323,7 @@ func (f *factory) createMetricsExporter( statsIn := make(chan []byte, 1000) statsv := set.BuildInfo.Command + set.BuildInfo.Version - f.consumeStatsPayload(ctx, wg, statsIn, statsToAgent, statsv, acfg.AgentVersion, set.Logger) + f.consumeStatsPayload(ctx, &wg, statsIn, statsToAgent, statsv, acfg.AgentVersion, set.Logger) pcfg := newMetadataConfigfromConfig(cfg) metadataReporter, err := f.Reporter(set, pcfg) if err != nil { @@ -426,7 +426,7 @@ func (f *factory) createTracesExporter( return nil, fmt.Errorf("failed to build attributes translator: %w", err) } - traceagent, err := f.TraceAgent(ctx, wg, set, cfg, hostProvider, attrsTranslator) + traceagent, err := f.TraceAgent(ctx, &wg, set, cfg, hostProvider, attrsTranslator) if err != nil { cancel() return nil, fmt.Errorf("failed to start trace-agent: %w", err) diff --git a/exporter/datadogexporter/factory_test.go b/exporter/datadogexporter/factory_test.go index 1d391fadb8a8..ea0b612b4b14 100644 --- a/exporter/datadogexporter/factory_test.go +++ b/exporter/datadogexporter/factory_test.go @@ -780,8 +780,10 @@ func TestStopExporters(t *testing.T) { finishShutdown := make(chan bool) go func() { - expMetrics.Shutdown(ctx) - expTraces.Shutdown(ctx) + err = expMetrics.Shutdown(ctx) + assert.NoError(t, err) + err = expTraces.Shutdown(ctx) + assert.NoError(t, err) finishShutdown <- true }()