Skip to content

Commit

Permalink
[exporter/loadbalancing] Fix panic on a sub-exporter shutdown (open-t…
Browse files Browse the repository at this point in the history
…elemetry#31456)

Fix panic when a sub-exporter is shut down while still handling
requests. This change wraps exporters with an additional working group
to ensure that exporters are shut down only after they finish processing
data.

Fixes
open-telemetry#31410

It has some small related refactoring changes. I can extract them in
separate PRs if needed.
  • Loading branch information
dmitryax authored and XinRanZhAWS committed Mar 13, 2024
1 parent 09d856f commit 86e9e02
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 183 deletions.
22 changes: 22 additions & 0 deletions .chloggen/fix-load-balancing-exp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/loadbalancing

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix panic when a sub-exporter is shut down while still handling requests.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31410]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
51 changes: 19 additions & 32 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,24 @@ var (
errMultipleResolversProvided = errors.New("only one resolver should be specified")
)

var _ loadBalancer = (*loadBalancerImp)(nil)

type componentFactory func(ctx context.Context, endpoint string) (component.Component, error)

type loadBalancer interface {
component.Component
Endpoint(identifier []byte) string
Exporter(endpoint string) (component.Component, error)
}

type loadBalancerImp struct {
type loadBalancer struct {
logger *zap.Logger
host component.Host

res resolver
ring *hashRing

componentFactory componentFactory
exporters map[string]component.Component
exporters map[string]*wrappedExporter

stopped bool
updateLock sync.RWMutex
}

// Create new load balancer
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) {
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancer, error) {
oCfg := cfg.(*Config)

if oCfg.Resolver.DNS != nil && oCfg.Resolver.Static != nil {
Expand Down Expand Up @@ -90,21 +82,21 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
return nil, errNoResolver
}

return &loadBalancerImp{
return &loadBalancer{
logger: params.Logger,
res: res,
componentFactory: factory,
exporters: map[string]component.Component{},
exporters: map[string]*wrappedExporter{},
}, nil
}

func (lb *loadBalancerImp) Start(ctx context.Context, host component.Host) error {
func (lb *loadBalancer) Start(ctx context.Context, host component.Host) error {
lb.res.onChange(lb.onBackendChanges)
lb.host = host
return lb.res.start(ctx)
}

func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
func (lb *loadBalancer) onBackendChanges(resolved []string) {
newRing := newHashRing(resolved)

if !newRing.equal(lb.ring) {
Expand All @@ -122,7 +114,7 @@ func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
}
}

func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []string) {
func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []string) {
for _, endpoint := range endpoints {
endpoint = endpointWithPort(endpoint)

Expand All @@ -132,12 +124,12 @@ func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []
lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
continue
}

if err = exp.Start(ctx, lb.host); err != nil {
we := newWrappedExporter(exp)
if err = we.Start(ctx, lb.host); err != nil {
lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
continue
}
lb.exporters[endpoint] = exp
lb.exporters[endpoint] = we
}
}
}
Expand All @@ -149,7 +141,7 @@ func endpointWithPort(endpoint string) string {
return endpoint
}

func (lb *loadBalancerImp) removeExtraExporters(ctx context.Context, endpoints []string) {
func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []string) {
endpointsWithPort := make([]string, len(endpoints))
for i, e := range endpoints {
endpointsWithPort[i] = endpointWithPort(e)
Expand All @@ -172,29 +164,24 @@ func endpointFound(endpoint string, endpoints []string) bool {
return false
}

func (lb *loadBalancerImp) Shutdown(context.Context) error {
func (lb *loadBalancer) Shutdown(context.Context) error {
lb.stopped = true
return nil
}

func (lb *loadBalancerImp) Endpoint(identifier []byte) string {
lb.updateLock.RLock()
defer lb.updateLock.RUnlock()

return lb.ring.endpointFor(identifier)
}

func (lb *loadBalancerImp) Exporter(endpoint string) (component.Component, error) {
// exporterAndEndpoint returns the exporter and the endpoint for the given identifier.
func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter, string, error) {
// NOTE: make rolling updates of next tier of collectors work. currently, this may cause
// data loss because the latest batches sent to outdated backend will never find their way out.
// for details: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690
lb.updateLock.RLock()
defer lb.updateLock.RUnlock()
endpoint := lb.ring.endpointFor(identifier)
exp, found := lb.exporters[endpointWithPort(endpoint)]
lb.updateLock.RUnlock()
if !found {
// something is really wrong... how come we couldn't find the exporter??
return nil, fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
}

return exp, nil
return exp, endpoint, nil
}
10 changes: 5 additions & 5 deletions exporter/loadbalancingexporter/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) {
require.NoError(t, err)

// test
e := p.Endpoint([]byte{128, 128, 0, 0})
_, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0})

// verify
assert.Equal(t, "", e)
Expand Down Expand Up @@ -376,19 +376,19 @@ func TestFailedExporterInRing(t *testing.T) {

// test
// this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info
_, err = p.Exporter(p.Endpoint([]byte{128, 128, 0, 0}))
_, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0})

// verify
assert.Error(t, err)

// test
// this service name will reach the endpoint-2 -- see the consistent hashing tests for more info
_, err = p.Exporter(p.Endpoint([]byte("get-recommendations-1")))
_, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1"))

// verify
assert.Error(t, err)
}

func newNopMockExporter() component.Component {
return mockComponent{}
func newNopMockExporter() *wrappedExporter {
return newWrappedExporter(mockComponent{})
}
12 changes: 4 additions & 8 deletions exporter/loadbalancingexporter/log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -26,7 +25,7 @@ import (
var _ exporter.Logs = (*logExporterImp)(nil)

type logExporterImp struct {
loadBalancer loadBalancer
loadBalancer *loadBalancer

started bool
shutdownWg sync.WaitGroup
Expand Down Expand Up @@ -87,16 +86,13 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error {
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
exp, err := e.loadBalancer.Exporter(endpoint)
le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:])
if err != nil {
return err
}

le, ok := exp.(exporter.Logs)
if !ok {
return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", exp)
}
le.consumeWG.Add(1)
defer le.consumeWG.Done()

start := time.Now()
err = le.ConsumeLogs(ctx, ld)
Expand Down
79 changes: 65 additions & 14 deletions exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) {
assert.Len(t, sink.AllLogs(), 1)
}

// this test validates that exporter is can concurrently change the endpoints while consuming logs.
func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) {
consumeStarted := make(chan struct{})
consumeDone := make(chan struct{})

// imitate a slow exporter
te := &mockLogsExporter{Component: mockComponent{}}
te.consumelogsfn = func(ctx context.Context, td plog.Logs) error {
close(consumeStarted)
time.Sleep(50 * time.Millisecond)
return te.consumeErr
}
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return te, nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig())
require.NotNil(t, p)
require.NoError(t, err)

endpoints := []string{"endpoint-1"}
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(ctx context.Context) ([]string, error) {
return endpoints, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

go func() {
assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs()))
close(consumeDone)
}()

// update endpoint while consuming logs
<-consumeStarted
endpoints = []string{"endpoint-2"}
endpoint, err := lb.res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, endpoints, endpoint)
<-consumeDone
}

func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331")

Expand Down Expand Up @@ -360,19 +412,17 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {

counter1 := &atomic.Int64{}
counter2 := &atomic.Int64{}
defaultExporters := map[string]component.Component{
"127.0.0.1:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
defaultExporters := map[string]*wrappedExporter{
"127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
counter1.Add(1)
// simulate an unreachable backend
time.Sleep(10 * time.Second)
return nil
},
),
"127.0.0.2:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
})),
"127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
counter2.Add(1)
return nil
},
),
})),
}

// test
Expand Down Expand Up @@ -458,15 +508,21 @@ func simpleLogWithoutID() plog.Logs {
type mockLogsExporter struct {
component.Component
consumelogsfn func(ctx context.Context, ld plog.Logs) error
consumeErr error
}

func (e *mockLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *mockLogsExporter) Shutdown(context.Context) error {
e.consumeErr = errors.New("exporter is shut down")
return nil
}

func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if e.consumelogsfn == nil {
return nil
return e.consumeErr
}
return e.consumelogsfn(ctx, ld)
}
Expand All @@ -484,10 +540,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e
}

func newNopMockLogsExporter() exporter.Logs {
return &mockLogsExporter{
Component: mockComponent{},
consumelogsfn: func(ctx context.Context, ld plog.Logs) error {
return nil
},
}
return &mockLogsExporter{Component: mockComponent{}}
}
Loading

0 comments on commit 86e9e02

Please sign in to comment.