Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data race in broker v1.36.0 #2320

Closed
samuelhewitt opened this issue Aug 22, 2022 · 12 comments · Fixed by #2409 or #2428
Closed

Data race in broker v1.36.0 #2320

samuelhewitt opened this issue Aug 22, 2022 · 12 comments · Fixed by #2409 or #2428

Comments

@samuelhewitt
Copy link

samuelhewitt commented Aug 22, 2022

Versions
Sarama Kafka Go
v1.36.0 2.8.1.0 1.19
Configuration
	cfg := sarama.NewConfig()
	cfg.Version = sarama.V2_8_1_0
	cfg.Consumer.Return.Errors = true
Sample Test code
code: CLICK ME

package saramatests

import (
	"context"
	"errors"
	"log"
	"testing"
	"time"

	"github.com/Shopify/sarama"
	"github.com/richardwilkes/toolbox/xio"
)

type testConsumerGroupHandler struct {
}

func (h *testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	return nil
}
func (h *testConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case msg, ok := <-claim.Messages():
			if !ok {
				return nil
			}
			if err := h.handleUpdates(session, msg); err != nil {
				return err
			}
		case <-session.Context().Done():
			return nil
		}
	}
}
func (h *testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}
func (h *testConsumerGroupHandler) handleUpdates(
	session sarama.ConsumerGroupSession,
	msg *sarama.ConsumerMessage,
) error {
	return nil
}

const groupID = "test-group"
const topic = "test-topic"
const offsetStart = int64(1234)

func TestConsume_RaceTest(t *testing.T) {
	sarama.Logger = log.Default()

	cfg := sarama.NewConfig()
	cfg.Version = sarama.V2_8_1_0
	cfg.Consumer.Return.Errors = true

	seedBroker := sarama.NewMockBroker(t, 1)

	joinGroupResponse := &sarama.JoinGroupResponse{}

	syncGroupResponse := &sarama.SyncGroupResponse{
		Version: 3, // sarama > 2.3.0.0 uses version 3
	}
	// Leverage mock response to get the MemberAssignment bytes
	mockSyncGroupResponse := sarama.NewMockSyncGroupResponse(t).SetMemberAssignment(&sarama.ConsumerGroupMemberAssignment{
		Version:  1,
		Topics:   map[string][]int32{topic: {0}}, // map "test-topic" to partition 0
		UserData: []byte{0x01},
	})
	syncGroupResponse.MemberAssignment = mockSyncGroupResponse.MemberAssignment

	heartbeatResponse := &sarama.HeartbeatResponse{
		Err: sarama.ErrNoError,
	}
	offsetFetchResponse := &sarama.OffsetFetchResponse{
		Version:        1,
		ThrottleTimeMs: 0,
		Err:            sarama.ErrNoError,
	}
	offsetFetchResponse.AddBlock(topic, 0, &sarama.OffsetFetchResponseBlock{
		Offset:      offsetStart,
		LeaderEpoch: 0,
		Metadata:    "",
		Err:         sarama.ErrNoError})

	offsetResponse := &sarama.OffsetResponse{
		Version: 1,
	}
	offsetResponse.AddTopicPartition(topic, 0, offsetStart)

	metadataResponse := new(sarama.MetadataResponse)
	metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
	metadataResponse.AddTopic("mismatched-topic", sarama.ErrUnknownTopicOrPartition)

	handlerMap := map[string]sarama.MockResponse{
		"ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t),
		"MetadataRequest":    sarama.NewMockSequence(metadataResponse),
		"OffsetRequest":      sarama.NewMockSequence(offsetResponse),
		"OffsetFetchRequest": sarama.NewMockSequence(offsetFetchResponse),
		"FindCoordinatorRequest": sarama.NewMockSequence(sarama.NewMockFindCoordinatorResponse(t).
			SetCoordinator(sarama.CoordinatorGroup, groupID, seedBroker)),
		"JoinGroupRequest": sarama.NewMockSequence(joinGroupResponse),
		"SyncGroupRequest": sarama.NewMockSequence(syncGroupResponse),
		"HeartbeatRequest": sarama.NewMockSequence(heartbeatResponse),
	}
	seedBroker.SetHandlerByMap(handlerMap)

	handler := &testConsumerGroupHandler{}
	cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))

	defer seedBroker.Close()

	retryWait := time.Second
	var client sarama.ConsumerGroup
	var err error
	clientRetries := 0
	for {
		client, err = sarama.NewConsumerGroup([]string{seedBroker.Addr()}, groupID, cfg)
		if err == nil {
			break
		}

		if retryWait < time.Minute {
			retryWait *= 2
		}

		clientRetries++

		if err = xio.ContextSleep(cancelCtx, retryWait); err != nil {
			break
		}
	}
	if err == nil {
		t.Fatalf("should not proceed to Consume")
		err = client.Consume(cancelCtx, []string{topic}, handler)
	}

	if clientRetries <= 0 {
		t.Errorf("clientRetries = %v; want > 0", clientRetries)
	}

	if err != nil && !errors.Is(err, context.DeadlineExceeded) {
		t.Fatal(err)
	}

	cancel()
}

Logs
logs: CLICK ME

==================
WARNING: DATA RACE
Write at 0x00c0001af500 by goroutine 34:
  github.com/Shopify/sarama.(*Broker).Open.func1()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:208 +0x9ad
  github.com/Shopify/sarama.withRecover()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/utils.go:43 +0x48
  github.com/Shopify/sarama.(*Broker).Open.func2()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:177 +0x39

Previous read at 0x00c0001af500 by goroutine 15:
  github.com/Shopify/sarama.(*Broker).handleResponsePromise()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:1032 +0x215
  github.com/Shopify/sarama.(*Broker).sendAndReceive()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:1026 +0xbc
  github.com/Shopify/sarama.(*Broker).ApiVersions()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:598 +0x5c
  github.com/Shopify/sarama.(*Broker).Open.func1.1()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:185 +0x131
  runtime.deferreturn()
      C:/Program Files/Go/src/runtime/panic.go:476 +0x32
  github.com/Shopify/sarama.withRecover()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/utils.go:43 +0x48
  github.com/Shopify/sarama.(*Broker).Open.func2()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:177 +0x39

Goroutine 34 (running) created at:
  github.com/Shopify/sarama.(*Broker).Open()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:177 +0x406
  github.com/Shopify/sarama.(*client).anyBroker()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:699 +0x14a
  github.com/Shopify/sarama.(*client).tryRefreshMetadata()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:898 +0x17e
  github.com/Shopify/sarama.(*client).tryRefreshMetadata.func2()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:893 +0x2c4
  github.com/Shopify/sarama.(*client).tryRefreshMetadata()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:968 +0xfe1
  github.com/Shopify/sarama.(*client).RefreshMetadata()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:499 +0x213
  github.com/Shopify/sarama.NewClient()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:174 +0x4aa
  github.com/Shopify/sarama.NewConsumerGroup()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/consumer_group.go:98 +0x5e

Goroutine 15 (finished) created at:
  github.com/Shopify/sarama.(*Broker).Open()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/broker.go:177 +0x406
  github.com/Shopify/sarama.(*client).anyBroker()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:699 +0x14a
  github.com/Shopify/sarama.(*client).tryRefreshMetadata()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:898 +0x17e
  github.com/Shopify/sarama.(*client).RefreshMetadata()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:499 +0x213
  github.com/Shopify/sarama.NewClient()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/client.go:174 +0x4aa
  github.com/Shopify/sarama.NewConsumerGroup()
      C:/Users/shewitt/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/consumer_group.go:98 +0x5e

==================
2022/08/22 10:30:50 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:50 *** mockbroker/1/1: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:50 *** mockbroker/1/1: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:50 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:50 *** mockbroker/1/1: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:50 *** mockbroker/1/1: connection closed, err=<nil>
2022/08/22 10:30:50 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:50 client/metadata no available broker to send metadata request to
2022/08/22 10:30:50 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:51 client/metadata retrying after 250ms... (2 attempts remaining)
2022/08/22 10:30:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:51 client/metadata fetching metadata for all topics from broker 127.0.0.1:63788
2022/08/22 10:30:51 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:51 *** mockbroker/1/2: connection opened
2022/08/22 10:30:51 *** mockbroker/1/2: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:51 *** mockbroker/1/2: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:51 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:51 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:51 *** mockbroker/1/2: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:51 *** mockbroker/1/2: connection closed, err=<nil>
2022/08/22 10:30:51 client/metadata no available broker to send metadata request to
2022/08/22 10:30:51 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:51 client/metadata retrying after 250ms... (1 attempts remaining)
2022/08/22 10:30:51 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:51 client/metadata fetching metadata for all topics from broker 127.0.0.1:63788
2022/08/22 10:30:51 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:51 *** mockbroker/1/3: connection opened
2022/08/22 10:30:51 *** mockbroker/1/3: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:51 *** mockbroker/1/3: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:51 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:51 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:51 client/metadata no available broker to send metadata request to
2022/08/22 10:30:51 *** mockbroker/1/3: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:51 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:51 *** mockbroker/1/3: connection closed, err=<nil>
2022/08/22 10:30:51 Closing Client
2022/08/22 10:30:53 Initializing new client
2022/08/22 10:30:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:53 client/metadata fetching metadata for all topics from broker 127.0.0.1:63788
2022/08/22 10:30:53 *** mockbroker/1/4: connection opened
2022/08/22 10:30:53 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:53 *** mockbroker/1/4: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:53 *** mockbroker/1/4: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:53 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:53 *** mockbroker/1/4: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:53 *** mockbroker/1/4: connection closed, err=<nil>
2022/08/22 10:30:53 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:53 client/metadata no available broker to send metadata request to
2022/08/22 10:30:53 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:53 client/metadata retrying after 250ms... (3 attempts remaining)
2022/08/22 10:30:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:53 client/metadata fetching metadata for all topics from broker 127.0.0.1:63788
2022/08/22 10:30:53 *** mockbroker/1/5: connection opened
2022/08/22 10:30:53 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:53 *** mockbroker/1/5: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:53 *** mockbroker/1/5: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:53 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:53 *** mockbroker/1/5: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:53 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:53 *** mockbroker/1/5: connection closed, err=<nil>
2022/08/22 10:30:53 client/metadata no available broker to send metadata request to
2022/08/22 10:30:53 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:53 client/metadata retrying after 250ms... (2 attempts remaining)
2022/08/22 10:30:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:53 client/metadata fetching metadata for all topics from broker 127.0.0.1:63788
2022/08/22 10:30:53 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:53 *** mockbroker/1/6: connection opened
2022/08/22 10:30:53 *** mockbroker/1/6: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:53 *** mockbroker/1/6: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:53 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:53 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:53 *** mockbroker/1/6: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:53 *** mockbroker/1/6: connection closed, err=<nil>
2022/08/22 10:30:53 client/metadata no available broker to send metadata request to
2022/08/22 10:30:53 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:54 client/metadata retrying after 250ms... (1 attempts remaining)
2022/08/22 10:30:54 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2022/08/22 10:30:54 client/metadata fetching metadata for all topics from broker 127.0.0.1:63788
2022/08/22 10:30:54 *** mockbroker/1/7: connection opened
2022/08/22 10:30:54 Connected to broker at 127.0.0.1:63788 (unregistered)
2022/08/22 10:30:54 *** mockbroker/1/7: replied to *sarama.ApiVersionsRequest with *sarama.ApiVersionsResponse
-> (*sarama.ApiVersionsRequest){Version:(int16)3 ClientSoftwareName:(string)sarama ClientSoftwareVersion:(string)dev}
-> (*sarama.ApiVersionsResponse){Version:(int16)3 ErrorCode:(int16)0 ApiKeys:([]sarama.ApiVersionsResponseKey)[<max>] ThrottleTimeMs:(int32)0}
2022/08/22 10:30:54 *** mockbroker/1/7: replied to *sarama.MetadataRequest with *sarama.MetadataResponse
-> (*sarama.MetadataRequest){Version:(int16)5 Topics:([]string)<nil> AllowAutoTopicCreation:(bool)false}
-> (*sarama.MetadataResponse){Version:(int16)0 ThrottleTimeMs:(int32)0 Brokers:([]*sarama.Broker)[<max>] ClusterID:(*string)<nil> ControllerID:(int32)0 Topics:([]*sarama.TopicMetadata)[<max>]}
2022/08/22 10:30:54 client/metadata got error from broker -1 while fetching metadata: kafka: insufficient data to decode packet, more bytes expected
2022/08/22 10:30:54 Closed connection to broker 127.0.0.1:63788
2022/08/22 10:30:54 client/metadata no available broker to send metadata request to
2022/08/22 10:30:54 client/brokers resurrecting 1 dead seed brokers
2022/08/22 10:30:54 *** mockbroker/1/7: invalid request: err=EOF, ([]uint8) <nil>
2022/08/22 10:30:54 *** mockbroker/1/7: connection closed, err=<nil>
2022/08/22 10:30:54 Closing Client
2022/08/22 10:30:54 *** mockbroker/1: listener closed, err=accept tcp 127.0.0.1:63788: use of closed network connection

Problem Description

A unit tests that confirms a ConsumeGroup wrapper successfully cancels after a context timeout begins failing due a data race when moving from v1.35.0 to v1.36.0.
In broker.go there are several reads for b.conf.MetricRegistry (test in question fails in handleResponsePromise), none of them are guarded with a lock. Broker.Open() writes to b.conf which creates the race condition.

@leejk9592
Copy link

v1.37.0 has the same problem.
it seems guarding b with a lock is needed, not b.conf

WARNING: DATA RACE
Write at 0x00c0026b4af0 by goroutine 621:
  github.com/Shopify/sarama.(*Broker).Open()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/broker.go:176 +0x39d
  github.com/Shopify/sarama.(*client).Coordinator()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/client.go:609 +0xb9
  github.com/Shopify/sarama.(*consumerGroupSession).heartbeatLoop()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/consumer_group.go:927 +0x41b
  github.com/Shopify/sarama.newConsumerGroupSession.func3()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/consumer_group.go:754 +0x39

Previous read at 0x00c0026b4af0 by goroutine 72:
  github.com/Shopify/sarama.(*Broker).handleResponsePromise()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/broker.go:1041 +0x1e9
  github.com/Shopify/sarama.(*Broker).sendAndReceive()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/broker.go:1035 +0xbc
  github.com/Shopify/sarama.(*Broker).CommitOffset()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/broker.go:509 +0x5c
  github.com/Shopify/sarama.(*offsetManager).flushToBroker()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/offset_manager.go:268 +0x5c
  github.com/Shopify/sarama.(*offsetManager).Commit()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/offset_manager.go:252 +0x2e
  github.com/Shopify/sarama.(*offsetManager).mainLoop()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/offset_manager.go:244 +0x14b
  github.com/Shopify/sarama.(*offsetManager).mainLoop-fm()
      <autogenerated>:1 +0x39
  github.com/Shopify/sarama.withRecover()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/utils.go:43 +0x48
  github.com/Shopify/sarama.newOffsetManagerFromClient.func1()
      /home/jenkins/agent/workspace/kov_kov-476/vendor/github.com/Shopify/sarama/offset_manager.go:81 +0x39

@alkmc
Copy link

alkmc commented Oct 11, 2022

Anyone? This is upgrade blocker for me.

@ctrix
Copy link

ctrix commented Oct 26, 2022

I have the very same identical issue with 1.37.2

==================
WARNING: DATA RACE
Write at 0x00c00c074070 by goroutine 72:
  github.com/Shopify/sarama.(*Broker).Open()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/broker.go:176 +0x39d
  github.com/Shopify/sarama.(*client).anyBroker()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:758 +0x148
  github.com/Shopify/sarama.(*client).tryRefreshMetadata()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:981 +0x17e
  github.com/Shopify/sarama.(*client).tryRefreshMetadata.func2()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:976 +0x2ba
  github.com/Shopify/sarama.(*client).tryRefreshMetadata()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:1051 +0xfab
  github.com/Shopify/sarama.(*client).RefreshMetadata()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:519 +0x219
  github.com/Shopify/sarama.(*client).refreshMetadata()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:944 +0x97
  github.com/Shopify/sarama.(*client).backgroundMetadataUpdater()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:922 +0x2aa
  github.com/Shopify/sarama.(*client).backgroundMetadataUpdater-fm()
      <autogenerated>:1 +0x39
  github.com/Shopify/sarama.withRecover()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/utils.go:43 +0x48
  github.com/Shopify/sarama.NewClient.func1()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/client.go:205 +0x39

Previous read at 0x00c00c074070 by goroutine 69:
  github.com/Shopify/sarama.(*Broker).handleResponsePromise()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/broker.go:1041 +0x1e9
  github.com/Shopify/sarama.(*Broker).sendAndReceive()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/broker.go:1035 +0xbc
  github.com/Shopify/sarama.(*Broker).ApiVersions()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/broker.go:606 +0x5c
  github.com/Shopify/sarama.(*Broker).Open.func1.1()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/broker.go:186 +0x131
  runtime.deferreturn()
      /usr/local/go/src/runtime/panic.go:436 +0x32
  github.com/Shopify/sarama.withRecover()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/utils.go:43 +0x48
  github.com/Shopify/sarama.(*Broker).Open.func2()
      /home/ctrix/go/pkg/mod/github.com/!shopify/sarama@v1.37.2/broker.go:178 +0x39
     

@stuart-byma
Copy link

Another example in 1.37.2

WARNING: DATA RACE
Write at 0x00c0002ee770 by goroutine 145:
  github.com/Shopify/sarama.(*Broker).Open()
     github.com/Shopify/sarama/broker.go:176 +0x39d
  github.com/Shopify/sarama.(*client).cachedLeader()
      github.com/Shopify/sarama/client.go:866 +0x29d
  github.com/Shopify/sarama.(*client).Leader()
     github.com/Shopify/sarama/client.go:459 +0x67
  github.com/Shopify/sarama.(*partitionProducer).updateLeader.func1()
     Shopify/sarama/async_producer.go:751 +0x197
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      github.com/eapache/go-resiliency/breaker/breaker.go:85 +0x7b
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      github.com/eapache/go-resiliency/breaker/breaker.go:86 +0x44
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      github.com/eapache/go-resiliency/breaker/breaker.go:55 +0x92
  github.com/Shopify/sarama.(*partitionProducer).updateLeader()
      github.com/Shopify/sarama/async_producer.go:746 +0x5e
  github.com/Shopify/sarama.(*partitionProducer).dispatch()
      /Shopify/sarama/async_producer.go:675 +0xc8a
  github.com/Shopify/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x39
  github.com/Shopify/sarama.withRecover()
      github.com/Shopify/sarama/utils.go:43 +0x48
  github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer.func1()
      github.com/Shopify/sarama/async_producer.go:599 +0x39

Previous read at 0x00c0002ee770 by goroutine 152:
  github.com/Shopify/sarama.(*Broker).handleResponsePromise()
      github.com/Shopify/sarama/broker.go:1041 +0x1e9
  github.com/Shopify/sarama.(*Broker).sendAndReceive()
      github.com/Shopify/sarama/broker.go:1035 +0xbc
  github.com/Shopify/sarama.(*Broker).ApiVersions()
      github.com/Shopify/sarama/broker.go:606 +0x5c
  github.com/Shopify/sarama.(*Broker).Open.func1.1()
      github.com/Shopify/sarama/broker.go:186 +0x131
  runtime.deferreturn()
      /usr/local/src/runtime/panic.go:436 +0x32
  github.com/Shopify/sarama.withRecover()
      github.com/Shopify/sarama/utils.go:43 +0x48
  github.com/Shopify/sarama.(*Broker).Open.func2()
      github.com/Shopify/sarama/broker.go:178 +0x39

@jesse-shopify
Copy link

I'm also interested in the status of this.

@vincentbernat
Copy link
Contributor

vincentbernat commented Dec 30, 2022

This was introduced in 5b04c98

vincentbernat added a commit to vincentbernat/sarama that referenced this issue Dec 30, 2022
A race condition was introduced in
5b04c98 (feat(metrics): track
consumer-fetch-response-size) when passing the metric registry around to
get additional metrics. Notably, `handleResponsePromise()` could access
the registry after the broker has been closed and is tentatively being
reopened. This triggers a data race because `b.metricRegistry` is being
set during `Open()` (as it is part of the configuration).

We fix this by reverting the addition of `handleResponsePromise()` as a
method to `Broker`. Instead, we provide it with the metric registry as
an argument. An alternative would have been to get the metric registry
before the `select` call. However, removing it as a method make it
clearer than this function is not allowed to access the broker internals
as they are not protected by the lock and the broker may not be alive
any more.

All the following calls to `b.metricRegistry` are done while the lock is
held:

- inside `Open()`, the lock is held, including inside the goroutine
- inside `Close()`, the lock is held
- `AsyncProduce()` has a contract that it must be called while the broker
  is open, we keep a copy of the metric registry to use inside the callback
- `sendInternal()`, has a contract that the lock should be held
- `authenticateViaSASLv1()` is called from `Open()` and
  `sendWithPromise()`, both of them holding the lock
- `sendAndReceiveSASLHandshake()` is called from
- `authenticateViaSASLv0/v1()`, which are called from `Open()` and
  `sendWithPromise()`

I am unsure about `responseReceiver()`, however, it is also calling
`b.readFull()` which accesses `b.conn`, so I suppose it is safe.

This leaves `sendAndReceive()` which is calling `send()`, which is
calling `sendWithPromise()` which puts a lock. We move the lock to
`sendAndReceive()` instead. `send()` is only called from
`sendAndReceiver()` and we put a lock for `sendWithPromise()` other
caller.

The test has been stolen from IBM#2393 from @samuelhewitt. IBM#2393 is an
alternative proposal using a RW lock to protect `b.metricRegistry`.

Fix IBM#2320
dnwe pushed a commit that referenced this issue Jan 10, 2023
A race condition was introduced in
5b04c98 (feat(metrics): track
consumer-fetch-response-size) when passing the metric registry around to
get additional metrics. Notably, `handleResponsePromise()` could access
the registry after the broker has been closed and is tentatively being
reopened. This triggers a data race because `b.metricRegistry` is being
set during `Open()` (as it is part of the configuration).

We fix this by reverting the addition of `handleResponsePromise()` as a
method to `Broker`. Instead, we provide it with the metric registry as
an argument. An alternative would have been to get the metric registry
before the `select` call. However, removing it as a method make it
clearer than this function is not allowed to access the broker internals
as they are not protected by the lock and the broker may not be alive
any more.

All the following calls to `b.metricRegistry` are done while the lock is
held:

- inside `Open()`, the lock is held, including inside the goroutine
- inside `Close()`, the lock is held
- `AsyncProduce()` has a contract that it must be called while the broker
  is open, we keep a copy of the metric registry to use inside the callback
- `sendInternal()`, has a contract that the lock should be held
- `authenticateViaSASLv1()` is called from `Open()` and
  `sendWithPromise()`, both of them holding the lock
- `sendAndReceiveSASLHandshake()` is called from
- `authenticateViaSASLv0/v1()`, which are called from `Open()` and
  `sendWithPromise()`

I am unsure about `responseReceiver()`, however, it is also calling
`b.readFull()` which accesses `b.conn`, so I suppose it is safe.

This leaves `sendAndReceive()` which is calling `send()`, which is
calling `sendWithPromise()` which puts a lock. We move the lock to
`sendAndReceive()` instead. `send()` is only called from
`sendAndReceiver()` and we put a lock for `sendWithPromise()` other
caller.

The test has been stolen from #2393 from @samuelhewitt. #2393 is an
alternative proposal using a RW lock to protect `b.metricRegistry`.

Fix #2320
@stuart-byma
Copy link

The issue still remains in 1.38.1, I still encounter:

WARNING: DATA RACE
Read at 0x00c0000c2070 by goroutine 152:
github.com/Shopify/sarama.(*Broker).AsyncProduce()
github.com/Shopify/sarama/broker.go:432 +0x66
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1()
github.com/Shopify/sarama/async_producer.go:828 +0x14f
github.com/Shopify/sarama.withRecover()
github.com/Shopify/sarama/utils.go:43 +0x48
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func4()
github.com/Shopify/sarama/async_producer.go:793 +0x39

Previous write at 0x00c0000c2070 by goroutine 146:
github.com/Shopify/sarama.(*Broker).Open()
github.com/Shopify/sarama/broker.go:178 +0x39d
github.com/Shopify/sarama.(*client).cachedLeader()
github.com/Shopify/sarama/client.go:874 +0x2c4
github.com/Shopify/sarama.(*client).LeaderAndEpoch()
github.com/Shopify/sarama/client.go:468 +0x67
github.com/Shopify/sarama.(*client).Leader()
github.com/Shopify/sarama/client.go:459 +0x48
github.com/Shopify/sarama.(*partitionProducer).dispatch()
github.com/Shopify/sarama/async_producer.go:631 +0xdd
github.com/Shopify/sarama.(*partitionProducer).dispatch-fm()
:1 +0x39
github.com/Shopify/sarama.withRecover()
github.com/Shopify/sarama/utils.go:43 +0x48
github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer.func1()
github.com/Shopify/sarama/async_producer.go:599 +0x39

Goroutine 152 (running) created at:
github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer()
github.com/Shopify/sarama/async_producer.go:793 +0x6b5
github.com/Shopify/sarama.(*asyncProducer).getBrokerProducer()
github.com/Shopify/sarama/async_producer.go:1342 +0x124
github.com/Shopify/sarama.(*partitionProducer).dispatch()
github.com/Shopify/sarama/async_producer.go:633 +0x196
github.com/Shopify/sarama.(*partitionProducer).dispatch-fm()
:1 +0x39
github.com/Shopify/sarama.withRecover()
github.com/Shopify/sarama/utils.go:43 +0x48
github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer.func1()
github.com/Shopify/sarama/async_producer.go:599 +0x39

Goroutine 146 (running) created at:
github.com/Shopify/sarama.(*asyncProducer).newPartitionProducer()
github.com/Shopify/sarama/async_producer.go:599 +0x41d
github.com/Shopify/sarama.(*topicProducer).dispatch()
github.com/Shopify/sarama/async_producer.go:509 +0x1ba
github.com/Shopify/sarama.(*topicProducer).dispatch-fm()
:1 +0x39
github.com/Shopify/sarama.withRecover()
github.com/Shopify/sarama/utils.go:43 +0x48
github.com/Shopify/sarama.(*asyncProducer).newTopicProducer.func1()
github.com/Shopify/sarama/async_producer.go:494 +0x39

Should I file a separate issue for this?

vincentbernat added a commit to vincentbernat/sarama that referenced this issue Jan 26, 2023
On failure, `Broker.Open()` can be called several times while a producer
is running. In IBM#2409, it was assumed that AsyncProduce can only be
called with an open broker, however, it should be read that a user
should call it after opening the broker. The broker could be
disconnected and in progress of being reconnected. This is not hard to
fix as we already have a lock protecting the creation of the registry:
just don't create a new metric registry when attempting to reopen the
broker.

Fix IBM#2320 (again)
dnwe pushed a commit that referenced this issue Jan 26, 2023
)

On failure, `Broker.Open()` can be called several times while a producer
is running. In #2409, it was assumed that AsyncProduce can only be
called with an open broker, however, it should be read that a user
should call it after opening the broker. The broker could be
disconnected and in progress of being reconnected. This is not hard to
fix as we already have a lock protecting the creation of the registry:
just don't create a new metric registry when attempting to reopen the
broker.

Fix #2320 (again)
@vincentbernat
Copy link
Contributor

@stuart-byma Can you try the fix in #2428?

@leejk9592
Copy link

@vincentbernat

#2428 resolves the problem, thx!

@duglin
Copy link

duglin commented Oct 25, 2023

I'm on v1.40.1 (which I believe has the fix) but I'm still seeing the issue. Is there something in my code I should look for that might be causing this?

Previous write at 0x00c0001f51f0 by goroutine 19:
  github.com/IBM/sarama.(*Broker).Open()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/broker.go:179 +0x3d2
  github.com/IBM/sarama.(*client).Coordinator()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/client.go:621 +0xb7
  github.com/IBM/sarama.(*nopCloserClient).Coordinator()
      <autogenerated>:1 +0x63
  github.com/IBM/sarama.(*consumerGroup).newSession()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/consumer_group.go:266 +0xd9
  github.com/IBM/sarama.(*consumerGroup).Consume()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/consumer_group.go:206 +0x1fd

and

Read at 0x00c0001f51f0 by goroutine 31:
  github.com/IBM/sarama.(*Broker).AsyncProduce()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/broker.go:434 +0x5d
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/async_producer.go:828 +0x264
  github.com/IBM/sarama.withRecover()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func4()
      /root/go/pkg/mod/github.com/!i!b!m/sarama@v1.40.1/async_producer.go:793 +0x33

@duglin
Copy link

duglin commented Oct 25, 2023

Also, why isn't some kind of mutex around that block of code needed? I would think we'd need to lock things if one thread is trying to read a field and another thread is trying to write it. The "if" doesn't prevent concurrent read/write or two writes is a 2nd thread sneaks in there between the "if" and the assignment.

@vincentbernat
Copy link
Contributor

A mutex specifically for the registry would kill the performance. From my understanding, it is not needed. The registry is only created on first use (protected by a lock) and then, it won't be modified anymore. It seems that a broker can be used without using Open() first, I don't know how this could happen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment