diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index 72621340712..0ec118c5fc2 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -99,6 +99,7 @@ func (c ClientWrapper) MultiSearch() es.MultiSearchService { // Close closes ESClient and flushes all data to the storage. func (c ClientWrapper) Close() error { + c.client.Stop() return c.bulkService.Close() } diff --git a/pkg/jtracer/jtracer.go b/pkg/jtracer/jtracer.go index 4c93755d673..c909e0b01a2 100644 --- a/pkg/jtracer/jtracer.go +++ b/pkg/jtracer/jtracer.go @@ -75,6 +75,11 @@ func initHelper( otelExporter func(ctx context.Context) (sdktrace.SpanExporter, error), otelResource func(ctx context.Context, svc string) (*resource.Resource, error), ) (*sdktrace.TracerProvider, error) { + res, err := otelResource(ctx, svc) + if err != nil { + return nil, err + } + traceExporter, err := otelExporter(ctx) if err != nil { return nil, err @@ -84,11 +89,6 @@ func initHelper( // span processor to aggregate spans before export. bsp := sdktrace.NewBatchSpanProcessor(traceExporter) - res, err := otelResource(ctx, svc) - if err != nil { - return nil, err - } - tracerProvider := sdktrace.NewTracerProvider( sdktrace.WithSpanProcessor(bsp), sdktrace.WithResource(res), diff --git a/pkg/jtracer/jtracer_test.go b/pkg/jtracer/jtracer_test.go index c9e1b18b8f9..fe3d6452972 100644 --- a/pkg/jtracer/jtracer_test.go +++ b/pkg/jtracer/jtracer_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.uber.org/goleak" ) func TestNew(t *testing.T) { @@ -59,7 +60,9 @@ func TestInitHelperExporterError(t *testing.T) { func(ctx context.Context) (sdktrace.SpanExporter, error) { return nil, fakeErr }, - nil, + func(ctx context.Context, svc string) (*resource.Resource, error) { + return nil, nil + }, ) require.Error(t, err) assert.EqualError(t, err, fakeErr.Error()) @@ -67,7 +70,7 @@ func TestInitHelperExporterError(t *testing.T) { func TestInitHelperResourceError(t *testing.T) { fakeErr := errors.New("fakeResourceError") - _, err := initHelper( + tp, err := initHelper( context.Background(), "svc", otelExporter, @@ -76,5 +79,10 @@ func TestInitHelperResourceError(t *testing.T) { }, ) require.Error(t, err) + require.Nil(t, tp) assert.EqualError(t, err, fakeErr.Error()) } + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index 0a465f0682c..5d3a276f121 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" uatomic "go.uber.org/atomic" + "go.uber.org/goleak" "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -196,6 +197,7 @@ func TestResizeUp(t *testing.T) { // wait until we are signaled that we can finish releaseConsumers.Wait() }) + defer q.Stop() assert.True(t, q.Produce("a")) // in process firstConsumer.Wait() @@ -244,6 +246,7 @@ func TestResizeDown(t *testing.T) { // wait until we are signaled that we can finish releaseConsumers.Wait() }) + defer q.Stop() assert.True(t, q.Produce("a")) // in process consumer.Wait() @@ -297,6 +300,7 @@ func TestResizeOldQueueIsDrained(t *testing.T) { expected.Done() } }) + defer q.Stop() assert.True(t, q.Produce("a")) consumerReady.Wait() @@ -315,28 +319,24 @@ func TestResizeOldQueueIsDrained(t *testing.T) { } func TestNoopResize(t *testing.T) { - q := NewBoundedQueue(2, func(item interface{}) { - }) + q := NewBoundedQueue(2, func(item interface{}) {}) assert.False(t, q.Resize(2)) } func TestZeroSize(t *testing.T) { - q := NewBoundedQueue(0, func(item interface{}) { - }) + q := NewBoundedQueue(0, func(item interface{}) {}) - q.StartConsumers(1, func(item interface{}) { - }) + q.StartConsumers(1, func(item interface{}) {}) + defer q.Stop() assert.False(t, q.Produce("a")) // in process } func BenchmarkBoundedQueue(b *testing.B) { - q := NewBoundedQueue(1000, func(item interface{}) { - }) - - q.StartConsumers(10, func(item interface{}) { - }) + q := NewBoundedQueue(1000, func(item interface{}) {}) + q.StartConsumers(10, func(item interface{}) {}) + defer q.Stop() for n := 0; n < b.N; n++ { q.Produce(n) @@ -344,14 +344,18 @@ func BenchmarkBoundedQueue(b *testing.B) { } func BenchmarkBoundedQueueWithFactory(b *testing.B) { - q := NewBoundedQueue(1000, func(item interface{}) { - }) + q := NewBoundedQueue(1000, func(item interface{}) {}) q.StartConsumersWithFactory(10, func() Consumer { return ConsumerFunc(func(item interface{}) {}) }) + defer q.Stop() for n := 0; n < b.N; n++ { q.Produce(n) } } + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index c483596e047..8ce0de4f386 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -327,15 +327,15 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom newCfg := *cfg // copy by value newCfg.Password = newPassword newCfg.PasswordFilePath = "" // avoid error that both are set + newClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err)) - } else { - oldClient := *client.Swap(&newClient) - if oldClient != nil { - if err := oldClient.Close(); err != nil { - f.logger.Error("failed to close Elasticsearch client", zap.Error(err)) - } + return + } + if oldClient := *client.Swap(&newClient); oldClient != nil { + if err := oldClient.Close(); err != nil { + f.logger.Error("failed to close Elasticsearch client", zap.Error(err)) } } } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 5a3972c1e11..e0f114706a0 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -119,6 +119,7 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{}).NewClient assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + defer f.Close() r, err := f.CreateSpanWriter() require.Error(t, err) assert.Nil(t, r) @@ -132,6 +133,7 @@ func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{}).NewClient assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + defer f.Close() w, err := f.CreateSpanWriter() require.EqualError(t, err, "--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") assert.Nil(t, w) @@ -205,6 +207,7 @@ func TestCreateTemplateError(t *testing.T) { f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) + defer f.Close() w, err := f.CreateSpanWriter() assert.Nil(t, w) assert.Error(t, err, "template-error") @@ -216,6 +219,7 @@ func TestILMDisableTemplateCreation(t *testing.T) { f.archiveConfig = &escfg.Configuration{} f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) + defer f.Close() require.NoError(t, err) _, err = f.CreateSpanWriter() assert.Nil(t, err) // as the createTemplate is not called, CreateSpanWriter should not return an error diff --git a/plugin/storage/es/package_test.go b/plugin/storage/es/package_test.go new file mode 100644 index 00000000000..f54340d7caa --- /dev/null +++ b/plugin/storage/es/package_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package es + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/plugin/storage/grpc/memory/plugin_test.go b/plugin/storage/grpc/memory/plugin_test.go index e839d0ae7fc..40ea0030ec8 100644 --- a/plugin/storage/grpc/memory/plugin_test.go +++ b/plugin/storage/grpc/memory/plugin_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/goleak" "github.com/jaegertracing/jaeger/plugin/storage/memory" ) @@ -35,3 +36,7 @@ func TestPluginUsesMemoryStorage(t *testing.T) { assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader()) assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter()) } + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index c2dee28812d..ddd558fa556 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -77,11 +77,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) logger.Info("Kafka factory", zap.Any("producer builder", f.Builder), zap.Any("topic", f.options.Topic)) - p, err := f.NewProducer(logger) - if err != nil { - return err - } - f.producer = p switch f.options.Encoding { case EncodingProto: f.marshaller = newProtobufMarshaller() @@ -90,6 +85,11 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) default: return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'") } + p, err := f.NewProducer(logger) + if err != nil { + return err + } + f.producer = p return nil } @@ -112,5 +112,10 @@ var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory func (f *Factory) Close() error { - return f.options.Config.TLS.Close() + var errs []error + if f.producer != nil { + errs = append(errs, f.producer.Close()) + } + errs = append(errs, f.options.Config.TLS.Close()) + return errors.Join(errs...) } diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index e4dbbedddab..0bdc1817d4e 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -91,6 +91,7 @@ func TestKafkaFactoryEncoding(t *testing.T) { f.Builder = &mockProducerBuilder{t: t} assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) assert.IsType(t, test.marshaller, f.marshaller) + assert.NoError(t, f.Close()) }) } } @@ -151,7 +152,8 @@ func TestKafkaFactoryDoesNotLogPassword(t *testing.T) { require.NoError(t, err) logger.Sync() - require.NotContains(t, logbuf.String(), "SECRET", "log output must not contain password in clear text") + assert.NotContains(t, logbuf.String(), "SECRET", "log output must not contain password in clear text") + assert.NoError(t, f.Close()) }) } } diff --git a/plugin/storage/kafka/package_test.go b/plugin/storage/kafka/package_test.go new file mode 100644 index 00000000000..b637ce3a2ca --- /dev/null +++ b/plugin/storage/kafka/package_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/plugin/storage/kafka/writer_test.go b/plugin/storage/kafka/writer_test.go index 3c65b9d570a..e5cfa55cfec 100644 --- a/plugin/storage/kafka/writer_test.go +++ b/plugin/storage/kafka/writer_test.go @@ -77,6 +77,7 @@ var _ spanstore.Writer = &SpanWriter{} func withSpanWriter(t *testing.T, fn func(span *model.Span, w *spanWriterTest)) { serviceMetrics := metricstest.NewFactory(100 * time.Millisecond) + defer serviceMetrics.Stop() saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true producer := saramaMocks.NewAsyncProducer(t, saramaConfig) diff --git a/plugin/storage/memory/factory_test.go b/plugin/storage/memory/factory_test.go index 20f598abfe2..f711ce72ae3 100644 --- a/plugin/storage/memory/factory_test.go +++ b/plugin/storage/memory/factory_test.go @@ -74,7 +74,9 @@ func TestPublishOpts(t *testing.T) { f.InitFromViper(v, zap.NewNop()) baseMetrics := metricstest.NewFactory(time.Second) + defer baseMetrics.Stop() forkFactory := metricstest.NewFactory(time.Second) + defer forkFactory.Stop() metricsFactory := fork.New("internal", forkFactory, baseMetrics) assert.NoError(t, f.Initialize(metricsFactory, zap.NewNop())) diff --git a/plugin/storage/memory/package_test.go b/plugin/storage/memory/package_test.go new file mode 100644 index 00000000000..a6c09a8622d --- /dev/null +++ b/plugin/storage/memory/package_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +}