diff --git a/.chloggen/fix-load-balancing-exp.yaml b/.chloggen/fix-load-balancing-exp.yaml new file mode 100755 index 000000000000..dfb55ab1205c --- /dev/null +++ b/.chloggen/fix-load-balancing-exp.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/loadbalancing + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix panic when a sub-exporter is shut down while still handling requests. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31410] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go index 862111d06c76..a156ad9d9394 100644 --- a/exporter/loadbalancingexporter/loadbalancer.go +++ b/exporter/loadbalancingexporter/loadbalancer.go @@ -24,17 +24,9 @@ var ( errMultipleResolversProvided = errors.New("only one resolver should be specified") ) -var _ loadBalancer = (*loadBalancerImp)(nil) - type componentFactory func(ctx context.Context, endpoint string) (component.Component, error) -type loadBalancer interface { - component.Component - Endpoint(identifier []byte) string - Exporter(endpoint string) (component.Component, error) -} - -type loadBalancerImp struct { +type loadBalancer struct { logger *zap.Logger host component.Host @@ -42,14 +34,14 @@ type loadBalancerImp struct { ring *hashRing componentFactory componentFactory - exporters map[string]component.Component + exporters map[string]*wrappedExporter stopped bool updateLock sync.RWMutex } // Create new load balancer -func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) { +func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancer, error) { oCfg := cfg.(*Config) if oCfg.Resolver.DNS != nil && oCfg.Resolver.Static != nil { @@ -90,21 +82,21 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto return nil, errNoResolver } - return &loadBalancerImp{ + return &loadBalancer{ logger: params.Logger, res: res, componentFactory: factory, - exporters: map[string]component.Component{}, + exporters: map[string]*wrappedExporter{}, }, nil } -func (lb *loadBalancerImp) Start(ctx context.Context, host component.Host) error { +func (lb *loadBalancer) Start(ctx context.Context, host component.Host) error { lb.res.onChange(lb.onBackendChanges) lb.host = host return lb.res.start(ctx) } -func (lb *loadBalancerImp) onBackendChanges(resolved []string) { +func (lb *loadBalancer) onBackendChanges(resolved []string) { newRing := newHashRing(resolved) if !newRing.equal(lb.ring) { @@ -122,7 +114,7 @@ func (lb *loadBalancerImp) onBackendChanges(resolved []string) { } } -func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []string) { +func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []string) { for _, endpoint := range endpoints { endpoint = endpointWithPort(endpoint) @@ -132,12 +124,12 @@ func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints [] lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err)) continue } - - if err = exp.Start(ctx, lb.host); err != nil { + we := newWrappedExporter(exp) + if err = we.Start(ctx, lb.host); err != nil { lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err)) continue } - lb.exporters[endpoint] = exp + lb.exporters[endpoint] = we } } } @@ -149,7 +141,7 @@ func endpointWithPort(endpoint string) string { return endpoint } -func (lb *loadBalancerImp) removeExtraExporters(ctx context.Context, endpoints []string) { +func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []string) { endpointsWithPort := make([]string, len(endpoints)) for i, e := range endpoints { endpointsWithPort[i] = endpointWithPort(e) @@ -172,29 +164,24 @@ func endpointFound(endpoint string, endpoints []string) bool { return false } -func (lb *loadBalancerImp) Shutdown(context.Context) error { +func (lb *loadBalancer) Shutdown(context.Context) error { lb.stopped = true return nil } -func (lb *loadBalancerImp) Endpoint(identifier []byte) string { - lb.updateLock.RLock() - defer lb.updateLock.RUnlock() - - return lb.ring.endpointFor(identifier) -} - -func (lb *loadBalancerImp) Exporter(endpoint string) (component.Component, error) { +// exporterAndEndpoint returns the exporter and the endpoint for the given identifier. +func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter, string, error) { // NOTE: make rolling updates of next tier of collectors work. currently, this may cause // data loss because the latest batches sent to outdated backend will never find their way out. // for details: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690 lb.updateLock.RLock() + defer lb.updateLock.RUnlock() + endpoint := lb.ring.endpointFor(identifier) exp, found := lb.exporters[endpointWithPort(endpoint)] - lb.updateLock.RUnlock() if !found { // something is really wrong... how come we couldn't find the exporter?? - return nil, fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint) + return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint) } - return exp, nil + return exp, endpoint, nil } diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 5d8c74e0f14b..26ad546a2cd8 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -136,7 +136,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) { require.NoError(t, err) // test - e := p.Endpoint([]byte{128, 128, 0, 0}) + _, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0}) // verify assert.Equal(t, "", e) @@ -376,19 +376,19 @@ func TestFailedExporterInRing(t *testing.T) { // test // this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info - _, err = p.Exporter(p.Endpoint([]byte{128, 128, 0, 0})) + _, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0}) // verify assert.Error(t, err) // test // this service name will reach the endpoint-2 -- see the consistent hashing tests for more info - _, err = p.Exporter(p.Endpoint([]byte("get-recommendations-1"))) + _, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1")) // verify assert.Error(t, err) } -func newNopMockExporter() component.Component { - return mockComponent{} +func newNopMockExporter() *wrappedExporter { + return newWrappedExporter(mockComponent{}) } diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index 6cfe82edca65..c59c2c189e8a 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -5,7 +5,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry import ( "context" - "fmt" "math/rand" "sync" "time" @@ -26,7 +25,7 @@ import ( var _ exporter.Logs = (*logExporterImp)(nil) type logExporterImp struct { - loadBalancer loadBalancer + loadBalancer *loadBalancer started bool shutdownWg sync.WaitGroup @@ -87,16 +86,13 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error { balancingKey = random() } - endpoint := e.loadBalancer.Endpoint(balancingKey[:]) - exp, err := e.loadBalancer.Exporter(endpoint) + le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:]) if err != nil { return err } - le, ok := exp.(exporter.Logs) - if !ok { - return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", exp) - } + le.consumeWG.Add(1) + defer le.consumeWG.Done() start := time.Now() err = le.ConsumeLogs(ctx, ld) diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index 30694a5a4235..50d8eafdf594 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) { assert.Len(t, sink.AllLogs(), 1) } +// this test validates that exporter is can concurrently change the endpoints while consuming logs. +func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) { + consumeStarted := make(chan struct{}) + consumeDone := make(chan struct{}) + + // imitate a slow exporter + te := &mockLogsExporter{Component: mockComponent{}} + te.consumelogsfn = func(ctx context.Context, td plog.Logs) error { + close(consumeStarted) + time.Sleep(50 * time.Millisecond) + return te.consumeErr + } + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return te, nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + + endpoints := []string{"endpoint-1"} + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return endpoints, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + go func() { + assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs())) + close(consumeDone) + }() + + // update endpoint while consuming logs + <-consumeStarted + endpoints = []string{"endpoint-2"} + endpoint, err := lb.res.resolve(context.Background()) + require.NoError(t, err) + require.Equal(t, endpoints, endpoint) + <-consumeDone +} + func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331") @@ -360,19 +412,17 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} - defaultExporters := map[string]component.Component{ - "127.0.0.1:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { + defaultExporters := map[string]*wrappedExporter{ + "127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - }, - ), - "127.0.0.2:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { + })), + "127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { counter2.Add(1) return nil - }, - ), + })), } // test @@ -458,15 +508,21 @@ func simpleLogWithoutID() plog.Logs { type mockLogsExporter struct { component.Component consumelogsfn func(ctx context.Context, ld plog.Logs) error + consumeErr error } func (e *mockLogsExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } +func (e *mockLogsExporter) Shutdown(context.Context) error { + e.consumeErr = errors.New("exporter is shut down") + return nil +} + func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { if e.consumelogsfn == nil { - return nil + return e.consumeErr } return e.consumelogsfn(ctx, ld) } @@ -484,10 +540,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e } func newNopMockLogsExporter() exporter.Logs { - return &mockLogsExporter{ - Component: mockComponent{}, - consumelogsfn: func(ctx context.Context, ld plog.Logs) error { - return nil - }, - } + return &mockLogsExporter{Component: mockComponent{}} } diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index 0b685801e346..9cd5ea10a676 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -28,11 +28,10 @@ import ( var _ exporter.Metrics = (*metricExporterImp)(nil) -type exporterMetrics map[component.Component]map[string]pmetric.Metrics -type endpointMetrics map[string]pmetric.Metrics +type exporterMetrics map[*wrappedExporter]pmetric.Metrics type metricExporterImp struct { - loadBalancer loadBalancer + loadBalancer *loadBalancer routingKey routingKey stopped bool @@ -82,13 +81,10 @@ func (e *metricExporterImp) Shutdown(context.Context) error { } func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - var errs error - var exp component.Component - batches := batchpersignal.SplitMetrics(md) exporterSegregatedMetrics := make(exporterMetrics) - endpointSegregatedMetrics := make(endpointMetrics) + endpoints := make(map[*wrappedExporter]string) for _, batch := range batches { routingIds, err := routingIdentifiersFromMetrics(batch, e.routingKey) @@ -97,61 +93,45 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri } for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + exp, endpoint, err := e.loadBalancer.exporterAndEndpoint([]byte(rid)) if err != nil { return err } - _, ok := exp.(exporter.Metrics) - if !ok { - return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", exp) - } - _, ok = endpointSegregatedMetrics[endpoint] + _, ok := exporterSegregatedMetrics[exp] if !ok { - endpointSegregatedMetrics[endpoint] = pmetric.NewMetrics() + exp.consumeWG.Add(1) + exporterSegregatedMetrics[exp] = pmetric.NewMetrics() } - endpointSegregatedMetrics[endpoint] = mergeMetrics(endpointSegregatedMetrics[endpoint], batch) + exporterSegregatedMetrics[exp] = mergeMetrics(exporterSegregatedMetrics[exp], batch) - _, ok = exporterSegregatedMetrics[exp] - if !ok { - exporterSegregatedMetrics[exp] = endpointMetrics{} - } - exporterSegregatedMetrics[exp][endpoint] = endpointSegregatedMetrics[endpoint] + endpoints[exp] = endpoint } } - errs = multierr.Append(errs, e.consumeMetric(ctx, exporterSegregatedMetrics)) - - return errs -} + var errs error -func (e *metricExporterImp) consumeMetric(ctx context.Context, exporterSegregatedMetrics exporterMetrics) error { - var err error - - for exp, endpointMetrics := range exporterSegregatedMetrics { - for endpoint, md := range endpointMetrics { - te, _ := exp.(exporter.Metrics) - - start := time.Now() - err = te.ConsumeMetrics(ctx, md) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + for exp, metrics := range exporterSegregatedMetrics { + start := time.Now() + err := exp.ConsumeMetrics(ctx, metrics) + exp.consumeWG.Done() + duration := time.Since(start) + errs = multierr.Append(errs, err) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } } - return err + return errs } func routingIdentifiersFromMetrics(mds pmetric.Metrics, key routingKey) (map[string]bool, error) { diff --git a/exporter/loadbalancingexporter/metrics_exporter_test.go b/exporter/loadbalancingexporter/metrics_exporter_test.go index 4abc5c0717df..c454c744263c 100644 --- a/exporter/loadbalancingexporter/metrics_exporter_test.go +++ b/exporter/loadbalancingexporter/metrics_exporter_test.go @@ -199,6 +199,58 @@ func TestConsumeMetrics(t *testing.T) { } +// this test validates that exporter is can concurrently change the endpoints while consuming metrics. +func TestConsumeMetrics_ConcurrentResolverChange(t *testing.T) { + consumeStarted := make(chan struct{}) + consumeDone := make(chan struct{}) + + // imitate a slow exporter + te := &mockMetricsExporter{Component: mockComponent{}} + te.ConsumeMetricsFn = func(ctx context.Context, td pmetric.Metrics) error { + close(consumeStarted) + time.Sleep(50 * time.Millisecond) + return te.consumeErr + } + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return te, nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newMetricsExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + + endpoints := []string{"endpoint-1"} + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return endpoints, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + go func() { + assert.NoError(t, p.ConsumeMetrics(context.Background(), simpleMetricsWithResource())) + close(consumeDone) + }() + + // update endpoint while consuming logs + <-consumeStarted + endpoints = []string{"endpoint-2"} + endpoint, err := lb.res.resolve(context.Background()) + require.NoError(t, err) + require.Equal(t, endpoints, endpoint) + <-consumeDone +} + func TestConsumeMetricsServiceBased(t *testing.T) { componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { return newNopMockMetricsExporter(), nil @@ -609,19 +661,17 @@ func TestRollingUpdatesWhenConsumeMetrics(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} - defaultExporters := map[string]component.Component{ - "127.0.0.1:4317": newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { + defaultExporters := map[string]*wrappedExporter{ + "127.0.0.1:4317": newWrappedExporter(newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - }, - ), - "127.0.0.2:4317": newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { + })), + "127.0.0.2:4317": newWrappedExporter(newMockMetricsExporter(func(ctx context.Context, td pmetric.Metrics) error { counter2.Add(1) return nil - }, - ), + })), } // test @@ -849,6 +899,7 @@ func appendSimpleMetricWithID(dest pmetric.ResourceMetrics, id string) { type mockMetricsExporter struct { component.Component ConsumeMetricsFn func(ctx context.Context, td pmetric.Metrics) error + consumeErr error } func newMockMetricsExporter(consumeMetricsFn func(ctx context.Context, td pmetric.Metrics) error) exporter.Metrics { @@ -859,21 +910,21 @@ func newMockMetricsExporter(consumeMetricsFn func(ctx context.Context, td pmetri } func newNopMockMetricsExporter() exporter.Metrics { - return &mockMetricsExporter{ - Component: mockComponent{}, - ConsumeMetricsFn: func(ctx context.Context, md pmetric.Metrics) error { - return nil - }, - } + return &mockMetricsExporter{Component: mockComponent{}} } func (e *mockMetricsExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } +func (e *mockMetricsExporter) Shutdown(context.Context) error { + e.consumeErr = errors.New("exporter is shut down") + return nil +} + func (e *mockMetricsExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { if e.ConsumeMetricsFn == nil { - return nil + return e.consumeErr } return e.ConsumeMetricsFn(ctx, md) } diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index d3ce46d8e2f6..d7e1dd1e5029 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -24,11 +24,10 @@ import ( var _ exporter.Traces = (*traceExporterImp)(nil) -type exporterTraces map[component.Component]map[string]ptrace.Traces -type endpointTraces map[string]ptrace.Traces +type exporterTraces map[*wrappedExporter]ptrace.Traces type traceExporterImp struct { - loadBalancer loadBalancer + loadBalancer *loadBalancer routingKey routingKey stopped bool @@ -80,13 +79,10 @@ func (e *traceExporterImp) Shutdown(context.Context) error { } func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - var errs error - var exp component.Component - batches := batchpersignal.SplitTraces(td) exporterSegregatedTraces := make(exporterTraces) - endpointSegregatedTraces := make(endpointTraces) + endpoints := make(map[*wrappedExporter]string) for _, batch := range batches { routingID, err := routingIdentifiersFromTraces(batch, e.routingKey) if err != nil { @@ -94,61 +90,45 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) } for rid := range routingID { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + exp, endpoint, err := e.loadBalancer.exporterAndEndpoint([]byte(rid)) if err != nil { return err } - _, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) - } - _, ok = endpointSegregatedTraces[endpoint] + _, ok := exporterSegregatedTraces[exp] if !ok { - endpointSegregatedTraces[endpoint] = ptrace.NewTraces() + exp.consumeWG.Add(1) + exporterSegregatedTraces[exp] = ptrace.NewTraces() } - endpointSegregatedTraces[endpoint] = mergeTraces(endpointSegregatedTraces[endpoint], batch) + exporterSegregatedTraces[exp] = mergeTraces(exporterSegregatedTraces[exp], batch) - _, ok = exporterSegregatedTraces[exp] - if !ok { - exporterSegregatedTraces[exp] = endpointTraces{} - } - exporterSegregatedTraces[exp][endpoint] = endpointSegregatedTraces[endpoint] + endpoints[exp] = endpoint } } - errs = multierr.Append(errs, e.consumeTrace(ctx, exporterSegregatedTraces)) - - return errs -} + var errs error -func (e *traceExporterImp) consumeTrace(ctx context.Context, exporterSegregatedTraces exporterTraces) error { - var err error - - for exp, endpointTraces := range exporterSegregatedTraces { - for endpoint, td := range endpointTraces { - te, _ := exp.(exporter.Traces) - - start := time.Now() - err = te.ConsumeTraces(ctx, td) - duration := time.Since(start) - - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + for exp, td := range exporterSegregatedTraces { + start := time.Now() + err := exp.ConsumeTraces(ctx, td) + exp.consumeWG.Done() + errs = multierr.Append(errs, err) + duration := time.Since(start) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoints[exp]), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } } - return err + return errs } func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) { diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 1f83b59d9364..c98b0ef14971 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -155,6 +155,59 @@ func TestConsumeTraces(t *testing.T) { assert.Nil(t, res) } +// This test validates that exporter is can concurrently change the endpoints while consuming traces. +func TestConsumeTraces_ConcurrentResolverChange(t *testing.T) { + consumeStarted := make(chan struct{}) + consumeDone := make(chan struct{}) + + // imitate a slow exporter + te := &mockTracesExporter{Component: mockComponent{}} + te.ConsumeTracesFn = func(ctx context.Context, td ptrace.Traces) error { + close(consumeStarted) + time.Sleep(50 * time.Millisecond) + return te.consumeErr + } + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return te, nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + assert.Equal(t, p.routingKey, traceIDRouting) + + endpoints := []string{"endpoint-1"} + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return endpoints, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + go func() { + assert.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + close(consumeDone) + }() + + // update endpoint while consuming traces + <-consumeStarted + endpoints = []string{"endpoint-2"} + endpoint, err := lb.res.resolve(context.Background()) + require.NoError(t, err) + require.Equal(t, endpoints, endpoint) + <-consumeDone +} + func TestConsumeTracesServiceBased(t *testing.T) { componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { return newNopMockTracesExporter(), nil @@ -465,19 +518,17 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { counter1 := &atomic.Int64{} counter2 := &atomic.Int64{} - defaultExporters := map[string]component.Component{ - "127.0.0.1:4317": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { + defaultExporters := map[string]*wrappedExporter{ + "127.0.0.1:4317": newWrappedExporter(newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { counter1.Add(1) // simulate an unreachable backend time.Sleep(10 * time.Second) return nil - }, - ), - "127.0.0.2:4317": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { + })), + "127.0.0.2:4317": newWrappedExporter(newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { counter2.Add(1) return nil - }, - ), + })), } // test @@ -688,6 +739,7 @@ func serviceBasedRoutingConfig() *Config { type mockTracesExporter struct { component.Component ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error + consumeErr error } func newMockTracesExporter(consumeTracesFn func(ctx context.Context, td ptrace.Traces) error) exporter.Traces { @@ -698,12 +750,12 @@ func newMockTracesExporter(consumeTracesFn func(ctx context.Context, td ptrace.T } func newNopMockTracesExporter() exporter.Traces { - return &mockTracesExporter{ - Component: mockComponent{}, - ConsumeTracesFn: func(ctx context.Context, td ptrace.Traces) error { - return nil - }, - } + return &mockTracesExporter{Component: mockComponent{}} +} + +func (e *mockTracesExporter) Shutdown(context.Context) error { + e.consumeErr = errors.New("exporter is shut down") + return nil } func (e *mockTracesExporter) Capabilities() consumer.Capabilities { @@ -712,7 +764,7 @@ func (e *mockTracesExporter) Capabilities() consumer.Capabilities { func (e *mockTracesExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { if e.ConsumeTracesFn == nil { - return nil + return e.consumeErr } return e.ConsumeTracesFn(ctx, td) } diff --git a/exporter/loadbalancingexporter/wrapped_exporter.go b/exporter/loadbalancingexporter/wrapped_exporter.go new file mode 100644 index 000000000000..cb2491fc23ff --- /dev/null +++ b/exporter/loadbalancingexporter/wrapped_exporter.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// wrappedExporter is an exporter that waits for the data processing to complete before shutting down. +// consumeWG has to be incremented explicitly by the consumer of the wrapped exporter. +type wrappedExporter struct { + component.Component + consumeWG sync.WaitGroup +} + +func newWrappedExporter(exp component.Component) *wrappedExporter { + return &wrappedExporter{Component: exp} +} + +func (we *wrappedExporter) Shutdown(ctx context.Context) error { + we.consumeWG.Wait() + return we.Component.Shutdown(ctx) +} + +func (we *wrappedExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + te, ok := we.Component.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", we.Component) + } + return te.ConsumeTraces(ctx, td) +} + +func (we *wrappedExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + me, ok := we.Component.(exporter.Metrics) + if !ok { + return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", we.Component) + } + return me.ConsumeMetrics(ctx, md) +} + +func (we *wrappedExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + le, ok := we.Component.(exporter.Logs) + if !ok { + return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", we.Component) + } + return le.ConsumeLogs(ctx, ld) +}