From 51e983046c4d80f176b3ba9fe707f3c2fb613b48 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Tue, 5 Mar 2024 02:45:45 -0800 Subject: [PATCH] [exporter/loadbalancing] Fix panic on a sub-exporter shutdown (#31456) Fix panic when a sub-exporter is shut down while still handling requests. This change wraps exporters with an additional working group to ensure that exporters are shut down only after they finish processing data. Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31410 It has some small related refactoring changes. I can extract them in separate PRs if needed. --- .chloggen/fix-load-balancing-exp.yaml | 22 +++++ .../loadbalancingexporter/loadbalancer.go | 51 +++++------- .../loadbalancer_test.go | 10 +-- .../loadbalancingexporter/log_exporter.go | 12 +-- .../log_exporter_test.go | 79 ++++++++++++++---- .../loadbalancingexporter/metrics_exporter.go | 76 +++++++----------- .../metrics_exporter_test.go | 79 ++++++++++++++---- .../loadbalancingexporter/trace_exporter.go | 76 +++++++----------- .../trace_exporter_test.go | 80 +++++++++++++++---- .../loadbalancingexporter/wrapped_exporter.go | 56 +++++++++++++ 10 files changed, 358 insertions(+), 183 deletions(-) create mode 100755 .chloggen/fix-load-balancing-exp.yaml create mode 100644 exporter/loadbalancingexporter/wrapped_exporter.go diff --git a/.chloggen/fix-load-balancing-exp.yaml b/.chloggen/fix-load-balancing-exp.yaml new file mode 100755 index 000000000000..dfb55ab1205c --- /dev/null +++ b/.chloggen/fix-load-balancing-exp.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/loadbalancing + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix panic when a sub-exporter is shut down while still handling requests. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31410] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go index 862111d06c76..a156ad9d9394 100644 --- a/exporter/loadbalancingexporter/loadbalancer.go +++ b/exporter/loadbalancingexporter/loadbalancer.go @@ -24,17 +24,9 @@ var ( errMultipleResolversProvided = errors.New("only one resolver should be specified") ) -var _ loadBalancer = (*loadBalancerImp)(nil) - type componentFactory func(ctx context.Context, endpoint string) (component.Component, error) -type loadBalancer interface { - component.Component - Endpoint(identifier []byte) string - Exporter(endpoint string) (component.Component, error) -} - -type loadBalancerImp struct { +type loadBalancer struct { logger *zap.Logger host component.Host @@ -42,14 +34,14 @@ type loadBalancerImp struct { ring *hashRing componentFactory componentFactory - exporters map[string]component.Component + exporters map[string]*wrappedExporter stopped bool updateLock sync.RWMutex } // Create new load balancer -func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) { +func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancer, error) { oCfg := cfg.(*Config) if oCfg.Resolver.DNS != nil && oCfg.Resolver.Static != nil { @@ -90,21 +82,21 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto return nil, errNoResolver } - return &loadBalancerImp{ + return &loadBalancer{ logger: params.Logger, res: res, componentFactory: factory, - exporters: map[string]component.Component{}, + exporters: map[string]*wrappedExporter{}, }, nil } -func (lb *loadBalancerImp) Start(ctx context.Context, host component.Host) error { +func (lb *loadBalancer) Start(ctx context.Context, host component.Host) error { lb.res.onChange(lb.onBackendChanges) lb.host = host return lb.res.start(ctx) } -func (lb *loadBalancerImp) onBackendChanges(resolved []string) { +func (lb *loadBalancer) onBackendChanges(resolved []string) { newRing := newHashRing(resolved) if !newRing.equal(lb.ring) { @@ -122,7 +114,7 @@ func (lb *loadBalancerImp) onBackendChanges(resolved []string) { } } -func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []string) { +func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []string) { for _, endpoint := range endpoints { endpoint = endpointWithPort(endpoint) @@ -132,12 +124,12 @@ func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints [] lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err)) continue } - - if err = exp.Start(ctx, lb.host); err != nil { + we := newWrappedExporter(exp) + if err = we.Start(ctx, lb.host); err != nil { lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err)) continue } - lb.exporters[endpoint] = exp + lb.exporters[endpoint] = we } } } @@ -149,7 +141,7 @@ func endpointWithPort(endpoint string) string { return endpoint } -func (lb *loadBalancerImp) removeExtraExporters(ctx context.Context, endpoints []string) { +func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []string) { endpointsWithPort := make([]string, len(endpoints)) for i, e := range endpoints { endpointsWithPort[i] = endpointWithPort(e) @@ -172,29 +164,24 @@ func endpointFound(endpoint string, endpoints []string) bool { return false } -func (lb *loadBalancerImp) Shutdown(context.Context) error { +func (lb *loadBalancer) Shutdown(context.Context) error { lb.stopped = true return nil } -func (lb *loadBalancerImp) Endpoint(identifier []byte) string { - lb.updateLock.RLock() - defer lb.updateLock.RUnlock() - - return lb.ring.endpointFor(identifier) -} - -func (lb *loadBalancerImp) Exporter(endpoint string) (component.Component, error) { +// exporterAndEndpoint returns the exporter and the endpoint for the given identifier. +func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter, string, error) { // NOTE: make rolling updates of next tier of collectors work. currently, this may cause // data loss because the latest batches sent to outdated backend will never find their way out. // for details: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690 lb.updateLock.RLock() + defer lb.updateLock.RUnlock() + endpoint := lb.ring.endpointFor(identifier) exp, found := lb.exporters[endpointWithPort(endpoint)] - lb.updateLock.RUnlock() if !found { // something is really wrong... how come we couldn't find the exporter?? - return nil, fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint) + return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint) } - return exp, nil + return exp, endpoint, nil } diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 5d8c74e0f14b..26ad546a2cd8 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -136,7 +136,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) { require.NoError(t, err) // test - e := p.Endpoint([]byte{128, 128, 0, 0}) + _, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0}) // verify assert.Equal(t, "", e) @@ -376,19 +376,19 @@ func TestFailedExporterInRing(t *testing.T) { // test // this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info - _, err = p.Exporter(p.Endpoint([]byte{128, 128, 0, 0})) + _, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0}) // verify assert.Error(t, err) // test // this service name will reach the endpoint-2 -- see the consistent hashing tests for more info - _, err = p.Exporter(p.Endpoint([]byte("get-recommendations-1"))) + _, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1")) // verify assert.Error(t, err) } -func newNopMockExporter() component.Component { - return mockComponent{} +func newNopMockExporter() *wrappedExporter { + return newWrappedExporter(mockComponent{}) } diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index 6cfe82edca65..c59c2c189e8a 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -5,7 +5,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry import ( "context" - "fmt" "math/rand" "sync" "time" @@ -26,7 +25,7 @@ import ( var _ exporter.Logs = (*logExporterImp)(nil) type logExporterImp struct { - loadBalancer loadBalancer + loadBalancer *loadBalancer started bool shutdownWg sync.WaitGroup @@ -87,16 +86,13 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error { balancingKey = random() } - endpoint := e.loadBalancer.Endpoint(balancingKey[:]) - exp, err := e.loadBalancer.Exporter(endpoint) + le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:]) if err != nil { return err } - le, ok := exp.(exporter.Logs) - if !ok { - return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", exp) - } + le.consumeWG.Add(1) + defer le.consumeWG.Done() start := time.Now() err = le.ConsumeLogs(ctx, ld) diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index 30694a5a4235..50d8eafdf594 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) { assert.Len(t, sink.AllLogs(), 1) } +// this test validates that exporter is can concurrently change the endpoints while consuming logs. +func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) { + consumeStarted := make(chan struct{}) + consumeDone := make(chan struct{}) + + // imitate a slow exporter + te := &mockLogsExporter{Component: mockComponent{}} + te.consumelogsfn = func(ctx context.Context, td plog.Logs) error { + close(consumeStarted) + time.Sleep(50 * time.Millisecond) + return te.consumeErr + } + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return te, nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + + endpoints := []string{"endpoint-1"} + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return endpoints, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + go func() { + assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs())) + close(consumeDone) + }() + + // update endpoint while consuming logs + <-consumeStarted + endpoints = []string{"endpoint-2"} + endpoint, err := lb.res.resolve(context.Background()) + require.NoError(t, err) + require.Equal(t, endpoints, endpoint) + <-consumeDone +} + func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331") @@ -360,19 +412,17 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} - defaultExporters := map[string]component.Component{ - "127.0.0.1:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { + defaultExporters := map[string]*wrappedExporter{ + "127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - }, - ), - "127.0.0.2:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { + })), + "127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { counter2.Add(1) return nil - }, - ), + })), } // test @@ -458,15 +508,21 @@ func simpleLogWithoutID() plog.Logs { type mockLogsExporter struct { component.Component consumelogsfn func(ctx context.Context, ld plog.Logs) error + consumeErr error } func (e *mockLogsExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } +func (e *mockLogsExporter) Shutdown(context.Context) error { + e.consumeErr = errors.New("exporter is shut down") + return nil +} + func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { if e.consumelogsfn == nil { - return nil + return e.consumeErr } return e.consumelogsfn(ctx, ld) } @@ -484,10 +540,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e } func newNopMockLogsExporter() exporter.Logs { - return &mockLogsExporter{ - Component: mockComponent{}, - consumelogsfn: func(ctx context.Context, ld plog.Logs) error { - return nil - }, - } + return &mockLogsExporter{Component: mockComponent{}} } diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index 0b685801e346..9cd5ea10a676 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -28,11 +28,10 @@ import ( var _ exporter.Metrics = (*metricExporterImp)(nil) -type exporterMetrics map[component.Component]map[string]pmetric.Metrics -type endpointMetrics map[string]pmetric.Metrics +type exporterMetrics map[*wrappedExporter]pmetric.Metrics type metricExporterImp struct { - loadBalancer loadBalancer + loadBalancer *loadBalancer routingKey routingKey stopped bool @@ -82,13 +81,10 @@ func (e *metricExporterImp) Shutdown(context.Context) error { } func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - var errs error - var exp component.Component - batches := batchpersignal.SplitMetrics(md) exporterSegregatedMetrics := make(exporterMetrics) - endpointSegregatedMetrics := make(endpointMetrics) + endpoints := make(map[*wrappedExporter]string) for _, batch := range batches { routingIds, err := routingIdentifiersFromMetrics(batch, e.routingKey) @@ -97,61 +93,45 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri } for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + exp, endpoint, err := e.loadBalancer.exporterAndEndpoint([]byte(rid)) if err != nil { return err } - _, ok := exp.(exporter.Metrics) - if !ok { - return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp) - } - _, ok = endpointSegregatedMetrics[endpoint] + _, ok := exporterSegregatedMetrics[exp] if !ok { - endpointSegregatedMetrics[endpoint] = pmetric.NewMetrics() + exp.consumeWG.Add(1) + exporterSegregatedMetrics[exp] = pmetric.NewMetrics() } - endpointSegregatedMetrics[endpoint] = mergeMetrics(endpointSegregatedMetrics[endpoint], batch) + exporterSegregatedMetrics[exp] = mergeMetrics(exporterSegregatedMetrics[exp], batch) - _, ok = exporterSegregatedMetrics[exp] - if !ok { - exporterSegregatedMetrics[exp] = endpointMetrics{} - } - exporterSegregatedMetrics[exp][endpoint] = endpointSegregatedMetrics[endpoint] + endpoints[exp] = endpoint } } - errs = multierr.Append(errs, e.consumeMetric(ctx, exporterSegregatedMetrics)) - - return errs -} + var errs error -func (e *metricExporterImp) consumeMetric(ctx context.Context, exporterSegregatedMetrics exporterMetrics) error { - var err error - - for exp, endpointMetrics := range exporterSegregatedMetrics { - for endpoint, md := range endpointMetrics { - te, _ := exp.(exporter.Metrics) - - start := time.Now() - err = te.ConsumeMetrics(ctx, md) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + for exp, metrics := range exporterSegregatedMetrics { + start := time.Now() + err := exp.ConsumeMetrics(ctx, metrics) + exp.consumeWG.Done() + duration := time.Since(start) + errs = multierr.Append(errs, err) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } } - return err + return errs } func routingIdentifiersFromMetrics(mds pmetric.Metrics, key routingKey) (map[string]bool, error) { diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index 4abc5c0717df..c454c744263c 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -199,6 +199,58 @@ func TestConsumeMetrics(t *testing.T) { } +// this test validates that exporter is can concurrently change the endpoints while consuming metrics. +func TestConsumeMetrics_ConcurrentResolverChange(t *testing.T) { + consumeStarted := make(chan struct{}) + consumeDone := make(chan struct{}) + + // imitate a slow exporter + te := &mockMetricsExporter{Component: mockComponent{}} + te.ConsumeMetricsFn = func(ctx context.Context, td pmetric.Metrics) error { + close(consumeStarted) + time.Sleep(50 * time.Millisecond) + return te.consumeErr + } + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return te, nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newMetricsExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + + endpoints := []string{"endpoint-1"} + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return endpoints, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + go func() { + assert.NoError(t, p.ConsumeMetrics(context.Background(), simpleMetricsWithResource())) + close(consumeDone) + }() + + // update endpoint while consuming logs + <-consumeStarted + endpoints = []string{"endpoint-2"} + endpoint, err := lb.res.resolve(context.Background()) + require.NoError(t, err) + require.Equal(t, endpoints, endpoint) + <-consumeDone +} + func TestConsumeMetricsServiceBased(t *testing.T) { componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { return newNopMockMetricsExporter(), nil @@ -609,19 +661,17 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} - defaultExporters := map[string]component.Component{ - "127.0.0.1:4317": newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { + defaultExporters := map[string]*wrappedExporter{ + "127.0.0.1:4317": newWrappedExporter(newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - }, - ), - "127.0.0.2:4317": newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { + })), + "127.0.0.2:4317": newWrappedExporter(newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { counter2.Add(1) return nil - }, - ), + })), } // test @@ -849,6 +899,7 @@ func appendSimpleMetricWithID(dest pmetric.ResourceMetrics, id string) { type mockMetricsExporter struct { component.Component ConsumeMetricsFn func(ctx context.Context, td pmetric.Metrics) error + consumeErr error } func newMockMetricsExporter(consumeMetricsFn func(ctx context.Context, td pmetric.Metrics) error) exporter.Metrics { @@ -859,21 +910,21 @@ func newMockMetricsExporter(consumeMetricsFn func(ctx context.Context, td pmetri } func newNopMockMetricsExporter() exporter.Metrics { - return &mockMetricsExporter{ - Component: mockComponent{}, - ConsumeMetricsFn: func(ctx context.Context, md pmetric.Metrics) error { - return nil - }, - } + return &mockMetricsExporter{Component: mockComponent{}} } func (e *mockMetricsExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } +func (e *mockMetricsExporter) Shutdown(context.Context) error { + e.consumeErr = errors.New("exporter is shut down") + return nil +} + func (e *mockMetricsExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { if e.ConsumeMetricsFn == nil { - return nil + return e.consumeErr } return e.ConsumeMetricsFn(ctx, md) } diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index d3ce46d8e2f6..d7e1dd1e5029 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -24,11 +24,10 @@ import ( var _ exporter.Traces = (*traceExporterImp)(nil) -type exporterTraces map[component.Component]map[string]ptrace.Traces -type endpointTraces map[string]ptrace.Traces +type exporterTraces map[*wrappedExporter]ptrace.Traces type traceExporterImp struct { - loadBalancer loadBalancer + loadBalancer *loadBalancer routingKey routingKey stopped bool @@ -80,13 +79,10 @@ func (e *traceExporterImp) Shutdown(context.Context) error { } func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - var errs error - var exp component.Component - batches := batchpersignal.SplitTraces(td) exporterSegregatedTraces := make(exporterTraces) - endpointSegregatedTraces := make(endpointTraces) + endpoints := make(map[*wrappedExporter]string) for _, batch := range batches { routingID, err := routingIdentifiersFromTraces(batch, e.routingKey) if err != nil { @@ -94,61 +90,45 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) } for rid := range routingID { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + exp, endpoint, err := e.loadBalancer.exporterAndEndpoint([]byte(rid)) if err != nil { return err } - _, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) - } - _, ok = endpointSegregatedTraces[endpoint] + _, ok := exporterSegregatedTraces[exp] if !ok { - endpointSegregatedTraces[endpoint] = ptrace.NewTraces() + exp.consumeWG.Add(1) + exporterSegregatedTraces[exp] = ptrace.NewTraces() } - endpointSegregatedTraces[endpoint] = mergeTraces(endpointSegregatedTraces[endpoint], batch) + exporterSegregatedTraces[exp] = mergeTraces(exporterSegregatedTraces[exp], batch) - _, ok = exporterSegregatedTraces[exp] - if !ok { - exporterSegregatedTraces[exp] = endpointTraces{} - } - exporterSegregatedTraces[exp][endpoint] = endpointSegregatedTraces[endpoint] + endpoints[exp] = endpoint } } - errs = multierr.Append(errs, e.consumeTrace(ctx, exporterSegregatedTraces)) - - return errs -} + var errs error -func (e *traceExporterImp) consumeTrace(ctx context.Context, exporterSegregatedTraces exporterTraces) error { - var err error - - for exp, endpointTraces := range exporterSegregatedTraces { - for endpoint, td := range endpointTraces { - te, _ := exp.(exporter.Traces) - - start := time.Now() - err = te.ConsumeTraces(ctx, td) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + for exp, td := range exporterSegregatedTraces { + start := time.Now() + err := exp.ConsumeTraces(ctx, td) + exp.consumeWG.Done() + errs = multierr.Append(errs, err) + duration := time.Since(start) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } } - return err + return errs } func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) { diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 1f83b59d9364..c98b0ef14971 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -155,6 +155,59 @@ func TestConsumeTraces(t *testing.T) { assert.Nil(t, res) } +// This test validates that exporter is can concurrently change the endpoints while consuming traces. +func TestConsumeTraces_ConcurrentResolverChange(t *testing.T) { + consumeStarted := make(chan struct{}) + consumeDone := make(chan struct{}) + + // imitate a slow exporter + te := &mockTracesExporter{Component: mockComponent{}} + te.ConsumeTracesFn = func(ctx context.Context, td ptrace.Traces) error { + close(consumeStarted) + time.Sleep(50 * time.Millisecond) + return te.consumeErr + } + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return te, nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + assert.Equal(t, p.routingKey, traceIDRouting) + + endpoints := []string{"endpoint-1"} + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return endpoints, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + go func() { + assert.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + close(consumeDone) + }() + + // update endpoint while consuming traces + <-consumeStarted + endpoints = []string{"endpoint-2"} + endpoint, err := lb.res.resolve(context.Background()) + require.NoError(t, err) + require.Equal(t, endpoints, endpoint) + <-consumeDone +} + func TestConsumeTracesServiceBased(t *testing.T) { componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { return newNopMockTracesExporter(), nil @@ -465,19 +518,17 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} - defaultExporters := map[string]component.Component{ - "127.0.0.1:4317": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { + defaultExporters := map[string]*wrappedExporter{ + "127.0.0.1:4317": newWrappedExporter(newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - }, - ), - "127.0.0.2:4317": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { + })), + "127.0.0.2:4317": newWrappedExporter(newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { counter2.Add(1) return nil - }, - ), + })), } // test @@ -688,6 +739,7 @@ func serviceBasedRoutingConfig() *Config { type mockTracesExporter struct { component.Component ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error + consumeErr error } func newMockTracesExporter(consumeTracesFn func(ctx context.Context, td ptrace.Traces) error) exporter.Traces { @@ -698,12 +750,12 @@ func newMockTracesExporter(consumeTracesFn func(ctx context.Context, td ptrace.T } func newNopMockTracesExporter() exporter.Traces { - return &mockTracesExporter{ - Component: mockComponent{}, - ConsumeTracesFn: func(ctx context.Context, td ptrace.Traces) error { - return nil - }, - } + return &mockTracesExporter{Component: mockComponent{}} +} + +func (e *mockTracesExporter) Shutdown(context.Context) error { + e.consumeErr = errors.New("exporter is shut down") + return nil } func (e *mockTracesExporter) Capabilities() consumer.Capabilities { @@ -712,7 +764,7 @@ func (e *mockTracesExporter) Capabilities() consumer.Capabilities { func (e *mockTracesExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { if e.ConsumeTracesFn == nil { - return nil + return e.consumeErr } return e.ConsumeTracesFn(ctx, td) } diff --git a/exporter/loadbalancingexporter/wrapped_exporter.go b/exporter/loadbalancingexporter/wrapped_exporter.go new file mode 100644 index 000000000000..cb2491fc23ff --- /dev/null +++ b/exporter/loadbalancingexporter/wrapped_exporter.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// wrappedExporter is an exporter that waits for the data processing to complete before shutting down. +// consumeWG has to be incremented explicitly by the consumer of the wrapped exporter. +type wrappedExporter struct { + component.Component + consumeWG sync.WaitGroup +} + +func newWrappedExporter(exp component.Component) *wrappedExporter { + return &wrappedExporter{Component: exp} +} + +func (we *wrappedExporter) Shutdown(ctx context.Context) error { + we.consumeWG.Wait() + return we.Component.Shutdown(ctx) +} + +func (we *wrappedExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + te, ok := we.Component.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", we.Component) + } + return te.ConsumeTraces(ctx, td) +} + +func (we *wrappedExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + me, ok := we.Component.(exporter.Metrics) + if !ok { + return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", we.Component) + } + return me.ConsumeMetrics(ctx, md) +} + +func (we *wrappedExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + le, ok := we.Component.(exporter.Logs) + if !ok { + return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", we.Component) + } + return le.ConsumeLogs(ctx, ld) +}