Skip to content

Commit

Permalink
Fix goroutine leaks in several packages (#5026)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #5006

## Description of the changes
- Fix goroutine leaks in several packages

## How was this change tested?
- go test

---------

Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro authored Dec 22, 2023
1 parent baed450 commit db20c5b
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 33 deletions.
1 change: 1 addition & 0 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (c ClientWrapper) MultiSearch() es.MultiSearchService {

// Close closes ESClient and flushes all data to the storage.
func (c ClientWrapper) Close() error {
c.client.Stop()
return c.bulkService.Close()
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/jtracer/jtracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func initHelper(
otelExporter func(ctx context.Context) (sdktrace.SpanExporter, error),
otelResource func(ctx context.Context, svc string) (*resource.Resource, error),
) (*sdktrace.TracerProvider, error) {
res, err := otelResource(ctx, svc)
if err != nil {
return nil, err
}

traceExporter, err := otelExporter(ctx)
if err != nil {
return nil, err
Expand All @@ -84,11 +89,6 @@ func initHelper(
// span processor to aggregate spans before export.
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)

res, err := otelResource(ctx, svc)
if err != nil {
return nil, err
}

tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSpanProcessor(bsp),
sdktrace.WithResource(res),
Expand Down
12 changes: 10 additions & 2 deletions pkg/jtracer/jtracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/goleak"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -59,15 +60,17 @@ func TestInitHelperExporterError(t *testing.T) {
func(ctx context.Context) (sdktrace.SpanExporter, error) {
return nil, fakeErr
},
nil,
func(ctx context.Context, svc string) (*resource.Resource, error) {
return nil, nil
},
)
require.Error(t, err)
assert.EqualError(t, err, fakeErr.Error())
}

func TestInitHelperResourceError(t *testing.T) {
fakeErr := errors.New("fakeResourceError")
_, err := initHelper(
tp, err := initHelper(
context.Background(),
"svc",
otelExporter,
Expand All @@ -76,5 +79,10 @@ func TestInitHelperResourceError(t *testing.T) {
},
)
require.Error(t, err)
require.Nil(t, tp)
assert.EqualError(t, err, fakeErr.Error())
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
30 changes: 17 additions & 13 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
uatomic "go.uber.org/atomic"
"go.uber.org/goleak"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -196,6 +197,7 @@ func TestResizeUp(t *testing.T) {
// wait until we are signaled that we can finish
releaseConsumers.Wait()
})
defer q.Stop()

assert.True(t, q.Produce("a")) // in process
firstConsumer.Wait()
Expand Down Expand Up @@ -244,6 +246,7 @@ func TestResizeDown(t *testing.T) {
// wait until we are signaled that we can finish
releaseConsumers.Wait()
})
defer q.Stop()

assert.True(t, q.Produce("a")) // in process
consumer.Wait()
Expand Down Expand Up @@ -297,6 +300,7 @@ func TestResizeOldQueueIsDrained(t *testing.T) {
expected.Done()
}
})
defer q.Stop()

assert.True(t, q.Produce("a"))
consumerReady.Wait()
Expand All @@ -315,43 +319,43 @@ func TestResizeOldQueueIsDrained(t *testing.T) {
}

func TestNoopResize(t *testing.T) {
q := NewBoundedQueue(2, func(item interface{}) {
})
q := NewBoundedQueue(2, func(item interface{}) {})

assert.False(t, q.Resize(2))
}

func TestZeroSize(t *testing.T) {
q := NewBoundedQueue(0, func(item interface{}) {
})
q := NewBoundedQueue(0, func(item interface{}) {})

q.StartConsumers(1, func(item interface{}) {
})
q.StartConsumers(1, func(item interface{}) {})
defer q.Stop()

assert.False(t, q.Produce("a")) // in process
}

func BenchmarkBoundedQueue(b *testing.B) {
q := NewBoundedQueue(1000, func(item interface{}) {
})

q.StartConsumers(10, func(item interface{}) {
})
q := NewBoundedQueue(1000, func(item interface{}) {})
q.StartConsumers(10, func(item interface{}) {})
defer q.Stop()

for n := 0; n < b.N; n++ {
q.Produce(n)
}
}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedQueue(1000, func(item interface{}) {
})
q := NewBoundedQueue(1000, func(item interface{}) {})

q.StartConsumersWithFactory(10, func() Consumer {
return ConsumerFunc(func(item interface{}) {})
})
defer q.Stop()

for n := 0; n < b.N; n++ {
q.Produce(n)
}
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
12 changes: 6 additions & 6 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom
newCfg := *cfg // copy by value
newCfg.Password = newPassword
newCfg.PasswordFilePath = "" // avoid error that both are set

newClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err))
} else {
oldClient := *client.Swap(&newClient)
if oldClient != nil {
if err := oldClient.Close(); err != nil {
f.logger.Error("failed to close Elasticsearch client", zap.Error(err))
}
return
}
if oldClient := *client.Swap(&newClient); oldClient != nil {
if err := oldClient.Close(); err != nil {
f.logger.Error("failed to close Elasticsearch client", zap.Error(err))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
f.archiveConfig = &escfg.Configuration{}
f.newClientFn = (&mockClientBuilder{}).NewClient
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
defer f.Close()
r, err := f.CreateSpanWriter()
require.Error(t, err)
assert.Nil(t, r)
Expand All @@ -132,6 +133,7 @@ func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) {
f.archiveConfig = &escfg.Configuration{}
f.newClientFn = (&mockClientBuilder{}).NewClient
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
defer f.Close()
w, err := f.CreateSpanWriter()
require.EqualError(t, err, "--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
assert.Nil(t, w)
Expand Down Expand Up @@ -205,6 +207,7 @@ func TestCreateTemplateError(t *testing.T) {
f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close()
w, err := f.CreateSpanWriter()
assert.Nil(t, w)
assert.Error(t, err, "template-error")
Expand All @@ -216,6 +219,7 @@ func TestILMDisableTemplateCreation(t *testing.T) {
f.archiveConfig = &escfg.Configuration{}
f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
defer f.Close()
require.NoError(t, err)
_, err = f.CreateSpanWriter()
assert.Nil(t, err) // as the createTemplate is not called, CreateSpanWriter should not return an error
Expand Down
25 changes: 25 additions & 0 deletions plugin/storage/es/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package es

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
5 changes: 5 additions & 0 deletions plugin/storage/grpc/memory/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/goleak"

"github.com/jaegertracing/jaeger/plugin/storage/memory"
)
Expand All @@ -35,3 +36,7 @@ func TestPluginUsesMemoryStorage(t *testing.T) {
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter())
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
17 changes: 11 additions & 6 deletions plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
logger.Info("Kafka factory",
zap.Any("producer builder", f.Builder),
zap.Any("topic", f.options.Topic))
p, err := f.NewProducer(logger)
if err != nil {
return err
}
f.producer = p
switch f.options.Encoding {
case EncodingProto:
f.marshaller = newProtobufMarshaller()
Expand All @@ -90,6 +85,11 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
default:
return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'")
}
p, err := f.NewProducer(logger)
if err != nil {
return err
}
f.producer = p
return nil
}

Expand All @@ -112,5 +112,10 @@ var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
func (f *Factory) Close() error {
return f.options.Config.TLS.Close()
var errs []error
if f.producer != nil {
errs = append(errs, f.producer.Close())
}
errs = append(errs, f.options.Config.TLS.Close())
return errors.Join(errs...)
}
4 changes: 3 additions & 1 deletion plugin/storage/kafka/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestKafkaFactoryEncoding(t *testing.T) {
f.Builder = &mockProducerBuilder{t: t}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.IsType(t, test.marshaller, f.marshaller)
assert.NoError(t, f.Close())
})
}
}
Expand Down Expand Up @@ -151,7 +152,8 @@ func TestKafkaFactoryDoesNotLogPassword(t *testing.T) {
require.NoError(t, err)
logger.Sync()

require.NotContains(t, logbuf.String(), "SECRET", "log output must not contain password in clear text")
assert.NotContains(t, logbuf.String(), "SECRET", "log output must not contain password in clear text")
assert.NoError(t, f.Close())
})
}
}
Expand Down
25 changes: 25 additions & 0 deletions plugin/storage/kafka/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
1 change: 1 addition & 0 deletions plugin/storage/kafka/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var _ spanstore.Writer = &SpanWriter{}

func withSpanWriter(t *testing.T, fn func(span *model.Span, w *spanWriterTest)) {
serviceMetrics := metricstest.NewFactory(100 * time.Millisecond)
defer serviceMetrics.Stop()
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
producer := saramaMocks.NewAsyncProducer(t, saramaConfig)
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/memory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func TestPublishOpts(t *testing.T) {
f.InitFromViper(v, zap.NewNop())

baseMetrics := metricstest.NewFactory(time.Second)
defer baseMetrics.Stop()
forkFactory := metricstest.NewFactory(time.Second)
defer forkFactory.Stop()
metricsFactory := fork.New("internal", forkFactory, baseMetrics)
assert.NoError(t, f.Initialize(metricsFactory, zap.NewNop()))

Expand Down
25 changes: 25 additions & 0 deletions plugin/storage/memory/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

0 comments on commit db20c5b

Please sign in to comment.