From 2d8cec1cc7e074e2ae5c4b5aff7d713e61579b93 Mon Sep 17 00:00:00 2001 From: Bill Havanki Date: Mon, 8 Nov 2021 15:24:40 -0500 Subject: [PATCH 1/3] Fix go vet issues: example tests, returning a mutex copy Example tests are renamed to fit required naming conventions. This caused them to be relocated in generated docs. Also, a minor problem with writer.newBatchQueue is fixed, in order to avoid an unnecessary copy of a newly created object holding a mutex. --- example_consumergroup_test.go | 4 ++-- example_groupbalancer_test.go | 10 +++++----- writer.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/example_consumergroup_test.go b/example_consumergroup_test.go index adbf940eb..aec5b1907 100644 --- a/example_consumergroup_test.go +++ b/example_consumergroup_test.go @@ -8,7 +8,7 @@ import ( "github.com/segmentio/kafka-go" ) -func ExampleConsumerGroupParallelReaders() { +func ExampleGeneration_Start_consumerGroupParallelReaders() { group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ ID: "my-group", Brokers: []string{"kafka:9092"}, @@ -60,7 +60,7 @@ func ExampleConsumerGroupParallelReaders() { } } -func ExampleConsumerGroupOverwriteOffsets() { +func ExampleGeneration_CommitOffsets_overwriteOffsets() { group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ ID: "my-group", Brokers: []string{"kafka:9092"}, diff --git a/example_groupbalancer_test.go b/example_groupbalancer_test.go index 48f44f8a8..e81cb4943 100644 --- a/example_groupbalancer_test.go +++ b/example_groupbalancer_test.go @@ -10,11 +10,11 @@ import ( "time" ) -// ExampleAWSRackLocal shows how the RackAffinityGroupBalancer can be used to -// pair up consumers with brokers in the same AWS availability zone. This code -// assumes that each brokers' rack is configured to be the name of the AZ in -// which it is running. -func ExampleAWSRackLocal() { +// ExampleNewReader_rackAffinity shows how the RackAffinityGroupBalancer can be +// used to pair up consumers with brokers in the same AWS availability zone. +// This code assumes that each brokers' rack is configured to be the name of the +// AZ in which it is running. +func ExampleNewReader_rackAffinity() { r := NewReader(ReaderConfig{ Brokers: []string{"kafka:9092"}, GroupID: "my-group", diff --git a/writer.go b/writer.go index 8fc34c4f0..a1308eb16 100644 --- a/writer.go +++ b/writer.go @@ -912,14 +912,14 @@ func (b *batchQueue) Close() { b.closed = true } -func newBatchQueue(initialSize int) batchQueue { +func newBatchQueue(initialSize int) *batchQueue { bq := batchQueue{ queue: make([]*writeBatch, 0, initialSize), } bq.cond.L = &bq.mutex - return bq + return &bq } // partitionWriter is a writer for a topic-partion pair. It maintains messaging order @@ -941,7 +941,7 @@ type partitionWriter struct { func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter { writer := &partitionWriter{ meta: key, - queue: newBatchQueue(10), + queue: *newBatchQueue(10), w: w, } go func() { From 7ab66fe57574bbd2303f65b79d652ebbc77d89d1 Mon Sep 17 00:00:00 2001 From: Bill Havanki Date: Mon, 15 Nov 2021 15:23:43 -0500 Subject: [PATCH 2/3] Rework mutex copying issue Instead of having newBatchQueue return a pointer to the entire batchQueue struct, instead make the mutex field within it a pointer, so that copying on return doesn't copy the mutex itself. The same treatment is needed for the cond field; the same no-copy rule applies to it. --- writer.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/writer.go b/writer.go index a1308eb16..762fc95d9 100644 --- a/writer.go +++ b/writer.go @@ -867,8 +867,8 @@ func (w *Writer) chooseTopic(msg Message) (string, error) { type batchQueue struct { queue []*writeBatch - mutex sync.Mutex - cond sync.Cond + mutex *sync.Mutex + cond *sync.Cond closed bool } @@ -912,14 +912,16 @@ func (b *batchQueue) Close() { b.closed = true } -func newBatchQueue(initialSize int) *batchQueue { +func newBatchQueue(initialSize int) batchQueue { bq := batchQueue{ queue: make([]*writeBatch, 0, initialSize), + mutex: &sync.Mutex{}, + cond: &sync.Cond{}, } - bq.cond.L = &bq.mutex + bq.cond.L = bq.mutex - return &bq + return bq } // partitionWriter is a writer for a topic-partion pair. It maintains messaging order @@ -941,7 +943,7 @@ type partitionWriter struct { func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter { writer := &partitionWriter{ meta: key, - queue: *newBatchQueue(10), + queue: newBatchQueue(10), w: w, } go func() { From a8136d4a2166d5f771aeb5233c9ad6b1dd0526ba Mon Sep 17 00:00:00 2001 From: Bill Havanki Date: Tue, 7 Dec 2021 11:31:41 -0500 Subject: [PATCH 3/3] Add comment about reverting to non-pointers in the future --- writer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/writer.go b/writer.go index 762fc95d9..b02d6dd13 100644 --- a/writer.go +++ b/writer.go @@ -867,6 +867,9 @@ func (w *Writer) chooseTopic(msg Message) (string, error) { type batchQueue struct { queue []*writeBatch + // Pointers are used here to make `go vet` happy, and avoid copying mutexes. + // It may be better to revert these to non-pointers and avoid the copies in + // a different way. mutex *sync.Mutex cond *sync.Cond