Skip to content

Commit

Permalink
contrib/confluentinc/confluent-kafka-go: fix goroutine leak in Produce (
Browse files Browse the repository at this point in the history
#2924)

Co-authored-by: Dario Castañé <dario.castane@datadoghq.com>
  • Loading branch information
rarguelloF and darccio committed Oct 23, 2024
1 parent b6aed52 commit a0e7ab6
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 176 deletions.
34 changes: 23 additions & 11 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,31 +334,43 @@ func (p *Producer) Close() {
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
span := p.startSpan(msg)

var errChan chan error

// if the user has selected a delivery channel, we will wrap it and
// wait for the delivery event to finish the span
// wait for the delivery event to finish the span.
// in case the Produce call returns an error, we won't receive anything
// in the deliveryChan, so we use errChan to make sure this goroutine exits.
if deliveryChan != nil {
errChan = make(chan error, 1)
oldDeliveryChan := deliveryChan
deliveryChan = make(chan kafka.Event)
go func() {
var err error
evt := <-deliveryChan
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
select {
case evt := <-deliveryChan:
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
}
oldDeliveryChan <- evt

case e := <-errChan:
err = e
}
span.Finish(tracer.WithError(err))
oldDeliveryChan <- evt
}()
}

setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg)
err := p.Producer.Produce(msg, deliveryChan)
// with no delivery channel or enqueue error, finish immediately
if err != nil || deliveryChan == nil {
span.Finish(tracer.WithError(err))
if err != nil {
if deliveryChan != nil {
errChan <- err
} else {
span.Finish(tracer.WithError(err))
}
}

return err
}

Expand Down
199 changes: 122 additions & 77 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,89 +23,14 @@ import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

var (
testGroupID = "gotest"
testTopic = "gotest"
)

type consumerActionFn func(c *Consumer) (*kafka.Message, error)

func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("to enable integration test, set the INTEGRATION environment variable")
}
mt := mocktracer.Start()
defer mt.Stop()

// first write a message to the topic
p, err := NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092",
"go.delivery.reports": true,
}, producerOpts...)
require.NoError(t, err)

delivery := make(chan kafka.Event, 1)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 0,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
require.NoError(t, err)

msg1, _ := (<-delivery).(*kafka.Message)
p.Close()

// next attempt to consume the message
c, err := NewConsumer(&kafka.ConfigMap{
"group.id": testGroupID,
"bootstrap.servers": "127.0.0.1:9092",
"fetch.wait.max.ms": 500,
"socket.timeout.ms": 1500,
"session.timeout.ms": 1500,
"enable.auto.offset.store": false,
}, consumerOpts...)
require.NoError(t, err)

err = c.Assign([]kafka.TopicPartition{
{Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset},
})
require.NoError(t, err)

msg2, err := consumerAction(c)
require.NoError(t, err)
_, err = c.CommitMessage(msg2)
require.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
err = c.Close()
require.NoError(t, err)

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
m := make(map[string]struct{})
for _, b := range backlogs {
m[strings.Join(b.Tags, "")] = struct{}{}
}
return m
}
backlogsMap := toMap(backlogs)
require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce")
}
return spans, msg2
}

func TestConsumerChannel(t *testing.T) {
// we can test consuming via the Events channel by artifically sending
// messages. Testing .Poll is done via an integration test.
Expand Down Expand Up @@ -320,7 +245,7 @@ func TestCustomTags(t *testing.T) {
"socket.timeout.ms": 10,
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
}, WithCustomTag("foo", func(msg *kafka.Message) interface{} {
}, WithCustomTag("foo", func(_ *kafka.Message) interface{} {
return "bar"
}), WithCustomTag("key", func(msg *kafka.Message) interface{} {
return msg.Key
Expand Down Expand Up @@ -370,3 +295,123 @@ func TestNamingSchema(t *testing.T) {
}
namingschematest.NewKafkaTest(genSpans)(t)
}

// Test we don't leak goroutines and properly close the span when Produce returns an error.
func TestProduceError(t *testing.T) {
defer func() {
err := goleak.Find()
if err != nil {
// if a goroutine is leaking, ensure it is not coming from this package
assert.NotContains(t, err.Error(), "contrib/confluentinc/confluent-kafka-go")
}
}()

mt := mocktracer.Start()
defer mt.Stop()

// first write a message to the topic
p, err := NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092",
"go.delivery.reports": true,
})
require.NoError(t, err)
defer p.Close()

// this empty message should cause an error in the Produce call.
topic := ""
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
}
deliveryChan := make(chan kafka.Event, 1)
err = p.Produce(msg, deliveryChan)
require.Error(t, err)
require.EqualError(t, err, "Local: Invalid argument or configuration")

select {
case <-deliveryChan:
assert.Fail(t, "there should be no events in the deliveryChan")
case <-time.After(1 * time.Second):
// assume there is no event
}

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
}

type consumerActionFn func(c *Consumer) (*kafka.Message, error)

func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("to enable integration test, set the INTEGRATION environment variable")
}
mt := mocktracer.Start()
defer mt.Stop()

// first write a message to the topic
p, err := NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092",
"go.delivery.reports": true,
}, producerOpts...)
require.NoError(t, err)

delivery := make(chan kafka.Event, 1)
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 0,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
require.NoError(t, err)

msg1, _ := (<-delivery).(*kafka.Message)
p.Close()

// next attempt to consume the message
c, err := NewConsumer(&kafka.ConfigMap{
"group.id": testGroupID,
"bootstrap.servers": "127.0.0.1:9092",
"fetch.wait.max.ms": 500,
"socket.timeout.ms": 1500,
"session.timeout.ms": 1500,
"enable.auto.offset.store": false,
}, consumerOpts...)
require.NoError(t, err)

err = c.Assign([]kafka.TopicPartition{
{Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset},
})
require.NoError(t, err)

msg2, err := consumerAction(c)
require.NoError(t, err)
_, err = c.CommitMessage(msg2)
require.NoError(t, err)
assert.Equal(t, msg1.String(), msg2.String())
err = c.Close()
require.NoError(t, err)

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(_ []internaldsm.Backlog) map[string]struct{} {
m := make(map[string]struct{})
for _, b := range backlogs {
m[strings.Join(b.Tags, "")] = struct{}{}
}
return m
}
backlogsMap := toMap(backlogs)
require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark")
require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce")
}
return spans, msg2
}
34 changes: 23 additions & 11 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,31 +334,43 @@ func (p *Producer) Close() {
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
span := p.startSpan(msg)

var errChan chan error

// if the user has selected a delivery channel, we will wrap it and
// wait for the delivery event to finish the span
// wait for the delivery event to finish the span.
// in case the Produce call returns an error, we won't receive anything
// in the deliveryChan, so we use errChan to make sure this goroutine exits.
if deliveryChan != nil {
errChan = make(chan error, 1)
oldDeliveryChan := deliveryChan
deliveryChan = make(chan kafka.Event)
go func() {
var err error
evt := <-deliveryChan
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
select {
case evt := <-deliveryChan:
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
}
oldDeliveryChan <- evt

case e := <-errChan:
err = e
}
span.Finish(tracer.WithError(err))
oldDeliveryChan <- evt
}()
}

setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg)
err := p.Producer.Produce(msg, deliveryChan)
// with no delivery channel or enqueue error, finish immediately
if err != nil || deliveryChan == nil {
span.Finish(tracer.WithError(err))
if err != nil {
if deliveryChan != nil {
errChan <- err
} else {
span.Finish(tracer.WithError(err))
}
}

return err
}

Expand Down
Loading

0 comments on commit a0e7ab6

Please sign in to comment.