From 6c70344bc549b62fc6316474fd6e1777d222ea69 Mon Sep 17 00:00:00 2001 From: Harshvir Potpose <122517264+akagami-harsh@users.noreply.github.com> Date: Wed, 3 Jan 2024 20:32:30 +0530 Subject: [PATCH] Fix goroutine leak in cmd/agent/app/reporter/grpc (#5075) ## Which problem is this PR solving? - part of #5006 ## Description of the changes - fixed goroutine leaks in `cmd/agent/app/reporter/grpc` and also added a goleak check ## How was this change tested? - go test ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` --------- Signed-off-by: Harshvir Potpose --- cmd/agent/app/proxy_builders.go | 5 +++- cmd/agent/app/reporter/grpc/builder.go | 26 ++++++++++------- cmd/agent/app/reporter/grpc/builder_test.go | 28 +++++++++++++++---- .../app/reporter/grpc/collector_proxy.go | 5 ++-- .../app/reporter/grpc/collector_proxy_test.go | 5 +++- cmd/agent/app/reporter/grpc/package_test.go | 25 +++++++++++++++++ cmd/agent/app/reporter/grpc/reporter_test.go | 5 ++-- 7 files changed, 77 insertions(+), 22 deletions(-) create mode 100644 cmd/agent/app/reporter/grpc/package_test.go diff --git a/cmd/agent/app/proxy_builders.go b/cmd/agent/app/proxy_builders.go index fd4015f48fe..893e36bc543 100644 --- a/cmd/agent/app/proxy_builders.go +++ b/cmd/agent/app/proxy_builders.go @@ -15,12 +15,15 @@ package app import ( + "context" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" ) // GRPCCollectorProxyBuilder creates CollectorProxyBuilder for GRPC reporter func GRPCCollectorProxyBuilder(builder *grpc.ConnBuilder) CollectorProxyBuilder { return func(opts ProxyBuilderOptions) (proxy CollectorProxy, err error) { - return grpc.NewCollectorProxy(builder, opts.AgentTags, opts.Metrics, opts.Logger) + ctx := context.Background() + return grpc.NewCollectorProxy(ctx, builder, opts.AgentTags, opts.Metrics, opts.Logger) } } diff --git a/cmd/agent/app/reporter/grpc/builder.go b/cmd/agent/app/reporter/grpc/builder.go index 9b6d8da499f..0ec32a5d2e3 100644 --- a/cmd/agent/app/reporter/grpc/builder.go +++ b/cmd/agent/app/reporter/grpc/builder.go @@ -58,7 +58,7 @@ func NewConnBuilder() *ConnBuilder { } // CreateConnection creates the gRPC connection -func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) { +func (b *ConnBuilder) CreateConnection(ctx context.Context, logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) { var dialOptions []grpc.DialOption var dialTarget string if b.TLS.Enabled { // user requested a secure connection @@ -115,16 +115,22 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Fact logger.Info("Checking connection to collector") for { - s := cc.GetState() - if s == connectivity.Ready { - cm.OnConnectionStatusChange(true) - cm.RecordTarget(cc.Target()) - } else { - cm.OnConnectionStatusChange(false) + select { + case <-ctx.Done(): + logger.Info("Stopping connection") + return + default: + s := cc.GetState() + if s == connectivity.Ready { + cm.OnConnectionStatusChange(true) + cm.RecordTarget(cc.Target()) + } else { + cm.OnConnectionStatusChange(false) + } + + logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s)) + cc.WaitForStateChange(ctx, s) } - - logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s)) - cc.WaitForStateChange(context.Background(), s) } }(conn, connectMetrics) diff --git a/cmd/agent/app/reporter/grpc/builder_test.go b/cmd/agent/app/reporter/grpc/builder_test.go index 994f62f77ef..572ec320d6f 100644 --- a/cmd/agent/app/reporter/grpc/builder_test.go +++ b/cmd/agent/app/reporter/grpc/builder_test.go @@ -59,8 +59,11 @@ func TestBuilderFromConfig(t *testing.T) { t, []string{"127.0.0.1:14268", "127.0.0.1:14269"}, cfg.CollectorHostPorts) - r, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + r, err := cfg.CreateConnection(ctx, zap.NewNop(), metrics.NullFactory) require.NoError(t, err) + defer r.Close() assert.NotNil(t, r) } @@ -149,9 +152,12 @@ func TestBuilderWithCollectors(t *testing.T) { cfg.Notifier = test.notifier cfg.Discoverer = test.discoverer - conn, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + conn, err := cfg.CreateConnection(ctx, zap.NewNop(), metrics.NullFactory) if test.expectedError == "" { require.NoError(t, err) + defer conn.Close() require.NotNil(t, conn) if test.checkConnectionState { assertConnectionState(t, conn, test.expectedState) @@ -207,10 +213,12 @@ func TestProxyBuilder(t *testing.T) { expectError: false, }, } - + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for _, test := range tests { t.Run(test.name, func(t *testing.T) { - proxy, err := NewCollectorProxy(test.grpcBuilder, nil, metrics.NullFactory, zap.NewNop()) + proxy, err := NewCollectorProxy(ctx, test.grpcBuilder, nil, metrics.NullFactory, zap.NewNop()) + if test.expectError { require.Error(t, err) } else { @@ -333,6 +341,8 @@ func TestProxyClientTLS(t *testing.T) { expectError: false, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for _, test := range tests { t.Run(test.name, func(t *testing.T) { var opts []grpc.ServerOption @@ -342,6 +352,7 @@ func TestProxyClientTLS(t *testing.T) { opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))} } + defer test.serverTLS.Close() spanHandler := &mockSpanHandler{} s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { api_v2.RegisterCollectorServiceServer(s, spanHandler) @@ -349,6 +360,7 @@ func TestProxyClientTLS(t *testing.T) { defer s.Stop() mFactory := metricstest.NewFactory(time.Microsecond) + defer mFactory.Stop() _, port, _ := net.SplitHostPort(addr.String()) grpcBuilder := &ConnBuilder{ @@ -356,6 +368,7 @@ func TestProxyClientTLS(t *testing.T) { TLS: test.clientTLS, } proxy, err := NewCollectorProxy( + ctx, grpcBuilder, nil, mFactory, @@ -369,7 +382,7 @@ func TestProxyClientTLS(t *testing.T) { r := proxy.GetReporter() - err = r.EmitBatch(context.Background(), &jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}}) + err = r.EmitBatch(ctx, &jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}}) if test.expectError { require.Error(t, err) @@ -418,8 +431,11 @@ func TestBuilderWithAdditionalDialOptions(t *testing.T) { AdditionalDialOptions: []grpc.DialOption{grpc.WithUnaryInterceptor(fi.intercept)}, } - r, err := cb.CreateConnection(zap.NewNop(), metrics.NullFactory) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + r, err := cb.CreateConnection(ctx, zap.NewNop(), metrics.NullFactory) require.NoError(t, err) + defer r.Close() assert.NotNil(t, r) err = r.Invoke(context.Background(), "test", map[string]string{}, map[string]string{}, []grpc.CallOption{}...) diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 829cd98625b..ec41f729b06 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -15,6 +15,7 @@ package grpc import ( + "context" "errors" "io" @@ -36,8 +37,8 @@ type ProxyBuilder struct { } // NewCollectorProxy creates ProxyBuilder -func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { - conn, err := builder.CreateConnection(logger, mFactory) +func NewCollectorProxy(ctx context.Context, builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { + conn, err := builder.CreateConnection(ctx, logger, mFactory) if err != nil { return nil, err } diff --git a/cmd/agent/app/reporter/grpc/collector_proxy_test.go b/cmd/agent/app/reporter/grpc/collector_proxy_test.go index 0ce2baa785a..8f3d1675722 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy_test.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy_test.go @@ -46,7 +46,10 @@ func TestMultipleCollectors(t *testing.T) { defer s2.Stop() mFactory := metricstest.NewFactory(time.Microsecond) - proxy, err := NewCollectorProxy(&ConnBuilder{CollectorHostPorts: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop()) + defer mFactory.Stop() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + proxy, err := NewCollectorProxy(ctx, &ConnBuilder{CollectorHostPorts: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop()) require.NoError(t, err) require.NotNil(t, proxy) assert.NotNil(t, proxy.GetReporter()) diff --git a/cmd/agent/app/reporter/grpc/package_test.go b/cmd/agent/app/reporter/grpc/package_test.go new file mode 100644 index 00000000000..5c91dca838a --- /dev/null +++ b/cmd/agent/app/reporter/grpc/package_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2024 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/cmd/agent/app/reporter/grpc/reporter_test.go b/cmd/agent/app/reporter/grpc/reporter_test.go index 6ee8f970cbf..c15ed85c4d8 100644 --- a/cmd/agent/app/reporter/grpc/reporter_test.go +++ b/cmd/agent/app/reporter/grpc/reporter_test.go @@ -61,8 +61,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) { defer s.Stop() conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) //nolint:staticcheck // don't care about errors - defer conn.Close() require.NoError(t, err) + defer conn.Close() rep := NewReporter(conn, nil, zap.NewNop()) @@ -104,8 +104,8 @@ func TestReporter_EmitBatch(t *testing.T) { defer s.Stop() conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) //nolint:staticcheck // don't care about errors - defer conn.Close() require.NoError(t, err) + defer conn.Close() rep := NewReporter(conn, nil, zap.NewNop()) tm := time.Unix(158, 0) @@ -133,6 +133,7 @@ func TestReporter_EmitBatch(t *testing.T) { func TestReporter_SendFailure(t *testing.T) { conn, err := grpc.Dial("invalid-host-name-blah:12345", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) + defer conn.Close() rep := NewReporter(conn, nil, zap.NewNop()) err = rep.send(context.Background(), nil, nil) require.Error(t, err)