Skip to content

Commit

Permalink
[chore][receiver/googlecloudpubsubreceiver] Fix goroutines leak (#37311)
Browse files Browse the repository at this point in the history
#### Description
Fixes goroutines leak by properly closing the underlying gRPC client
which is only when we're using an insecure custom endpoint.
Enables shutdown tests.

#### Link to tracking issue
Related to
#30438

Co-authored-by: Kevin N. <6809505+kevinnoel-be@users.noreply.github.com>
  • Loading branch information
alexvanboxel and kevinnoel-be authored Jan 20, 2025
1 parent b026f73 commit 0708844
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 47 deletions.
8 changes: 8 additions & 0 deletions .chloggen/fix-goleak-gcppubreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
change_type: bug_fix
component: googlecloudpubsubreceiver
note: Fix a goroutine leak during shutdown.
issues: [30438]
subtext: |
A goroutine leak was found in the googlecloudpubsubreceiver.
The goroutine leak was caused by the receiver not closing the underlying created gRPC client when using an insecure custom endpoint.
change_logs: []
53 changes: 53 additions & 0 deletions receiver/googlecloudpubsubreceiver/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion receiver/googlecloudpubsubreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
cloud.google.com/go/logging v1.13.0
cloud.google.com/go/pubsub v1.45.3
github.com/google/go-cmp v0.6.0
github.com/googleapis/gax-go/v2 v2.14.1
github.com/iancoleman/strcase v0.3.0
github.com/json-iterator/go v1.1.12
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down
5 changes: 2 additions & 3 deletions receiver/googlecloudpubsubreceiver/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"

pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand All @@ -27,7 +26,7 @@ type StreamHandler struct {
pushMessage func(ctx context.Context, message *pubsubpb.ReceivedMessage) error
acks []string
mutex sync.Mutex
client *pubsub.SubscriberClient
client SubscriberClient

clientID string
subscription string
Expand All @@ -53,7 +52,7 @@ func (handler *StreamHandler) ack(ackID string) {
func NewHandler(
ctx context.Context,
logger *zap.Logger,
client *pubsub.SubscriberClient,
client SubscriberClient,
clientID string,
subscription string,
callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error,
Expand Down
17 changes: 17 additions & 0 deletions receiver/googlecloudpubsubreceiver/internal/subscriber_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"

import (
"context"

"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"github.com/googleapis/gax-go/v2"
)

// subscriberClient subset of `pubsub.SubscriberClient`
type SubscriberClient interface {
Close() error
StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error)
}
2 changes: 0 additions & 2 deletions receiver/googlecloudpubsubreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ tests:
timeout: 20s
subscription: projects/my-project/subscriptions/otlp-subscription
skip_lifecycle: true
skip_shutdown: true
goleak:
skip: false
ignore:
# See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
top: go.opencensus.io/stats/view.(*worker).start
Expand Down
53 changes: 12 additions & 41 deletions receiver/googlecloudpubsubreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"
"time"

pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -24,11 +23,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
)
Expand All @@ -42,7 +36,7 @@ type pubsubReceiver struct {
logsConsumer consumer.Logs
userAgent string
config *Config
client *pubsub.SubscriberClient
client internal.SubscriberClient
tracesUnmarshaler ptrace.Unmarshaler
metricsUnmarshaler pmetric.Unmarshaler
logsUnmarshaler plog.Unmarshaler
Expand All @@ -68,34 +62,14 @@ const (
gZip = iota
)

func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOption) {
if receiver.userAgent != "" {
copts = append(copts, option.WithUserAgent(receiver.userAgent))
}
if receiver.config.Endpoint != "" {
if receiver.config.Insecure {
var dialOpts []grpc.DialOption
if receiver.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent))
}
conn, _ := grpc.NewClient(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
copts = append(copts, option.WithGRPCConn(conn))
} else {
copts = append(copts, option.WithEndpoint(receiver.config.Endpoint))
}
}
return copts
}

func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error {
if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil {
return errors.New("cannot start receiver: no consumers were specified")
}

var startErr error
receiver.startOnce.Do(func() {
copts := receiver.generateClientOptions()
client, err := pubsub.NewSubscriberClient(ctx, copts...)
client, err := newSubscriberClient(ctx, receiver.config, receiver.userAgent)
if err != nil {
startErr = fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
return
Expand All @@ -115,21 +89,18 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err
}

func (receiver *pubsubReceiver) Shutdown(_ context.Context) error {
var err error
if receiver.client != nil {
// A canceled code means the client connection is already closed,
// Shutdown shouldn't return an error in that case.
if closeErr := receiver.client.Close(); status.Code(closeErr) != codes.Canceled {
err = closeErr
}
if receiver.handler != nil {
receiver.logger.Info("Stopping Google Pubsub receiver")
receiver.handler.CancelNow()
receiver.logger.Info("Stopped Google Pubsub receiver")
receiver.handler = nil
}
if receiver.handler == nil {
return err
if receiver.client == nil {
return nil
}
receiver.logger.Info("Stopping Google Pubsub receiver")
receiver.handler.CancelNow()
receiver.logger.Info("Stopped Google Pubsub receiver")
return err
client := receiver.client
receiver.client = nil
return client.Close()
}

func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "service_account",
"private_key_id": "abc",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n",
"client_email": "123-abc@developer.gserviceaccount.com",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:8080/token"
}
76 changes: 76 additions & 0 deletions receiver/googlecloudpubsubreceiver/wrapped_subscriber_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package googlecloudpubsubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver"

import (
"context"
"fmt"

pubsub "cloud.google.com/go/pubsub/apiv1"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
)

// wrappedSubscriberClient allows to override the close function
type wrappedSubscriberClient struct {
internal.SubscriberClient
closeFn func() error
}

func (c *wrappedSubscriberClient) Close() error {
if c.closeFn != nil {
return c.closeFn()
}
return c.SubscriberClient.Close()
}

func newSubscriberClient(ctx context.Context, config *Config, userAgent string) (internal.SubscriberClient, error) {
clientOptions, closeFn, err := generateClientOptions(config, userAgent)
if err != nil {
return nil, fmt.Errorf("failed preparing the gRPC client options to PubSub: %w", err)
}

client, err := pubsub.NewSubscriberClient(ctx, clientOptions...)
if err != nil {
return nil, fmt.Errorf("failed creating the gRPC client to PubSub: %w", err)
}

if closeFn == nil {
return client, nil
}

return &wrappedSubscriberClient{
SubscriberClient: client,
closeFn: closeFn,
}, nil
}

func generateClientOptions(config *Config, userAgent string) ([]option.ClientOption, func() error, error) {
var copts []option.ClientOption
var closeFn func() error

if userAgent != "" {
copts = append(copts, option.WithUserAgent(userAgent))
}
if config.Endpoint != "" {
if config.Insecure {
var dialOpts []grpc.DialOption
if userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent))
}
client, err := grpc.NewClient(config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return nil, nil, err
}
copts = append(copts, option.WithGRPCConn(client))
closeFn = client.Close // we need to be able to properly close the grpc client otherwise it'll leak goroutines
} else {
copts = append(copts, option.WithEndpoint(config.Endpoint))
}
}
return copts, closeFn, nil
}
Loading

0 comments on commit 0708844

Please sign in to comment.