From 1cdea0aa2e8fe50e4702dac828c7c1da72c19773 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 21 Sep 2021 00:35:25 +0100 Subject: [PATCH 1/4] chore: populate the missing kafka versions As discussed under #2034, should simplify linkedin/Burrow usage too. --- utils.go | 26 ++++++++++++++++++++++++++ utils_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/utils.go b/utils.go index 421193ded..4ff973033 100644 --- a/utils.go +++ b/utils.go @@ -148,22 +148,35 @@ var ( V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) + V0_10_2_2 = newKafkaVersion(0, 10, 2, 2) V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) + V1_0_1_0 = newKafkaVersion(1, 0, 1, 0) + V1_0_2_0 = newKafkaVersion(1, 0, 2, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) + V2_1_1_0 = newKafkaVersion(2, 1, 1, 0) V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) + V2_2_1_0 = newKafkaVersion(2, 2, 1, 0) + V2_2_2_0 = newKafkaVersion(2, 2, 2, 0) V2_3_0_0 = newKafkaVersion(2, 3, 0, 0) + V2_3_1_0 = newKafkaVersion(2, 3, 1, 0) V2_4_0_0 = newKafkaVersion(2, 4, 0, 0) + V2_4_1_0 = newKafkaVersion(2, 4, 1, 0) V2_5_0_0 = newKafkaVersion(2, 5, 0, 0) + V2_5_1_0 = newKafkaVersion(2, 5, 1, 0) V2_6_0_0 = newKafkaVersion(2, 6, 0, 0) + V2_6_1_0 = newKafkaVersion(2, 6, 1, 0) + V2_6_2_0 = newKafkaVersion(2, 6, 2, 0) V2_7_0_0 = newKafkaVersion(2, 7, 0, 0) + V2_7_1_0 = newKafkaVersion(2, 7, 1, 0) V2_8_0_0 = newKafkaVersion(2, 8, 0, 0) + V2_8_1_0 = newKafkaVersion(2, 8, 1, 0) V3_0_0_0 = newKafkaVersion(3, 0, 0, 0) SupportedVersions = []KafkaVersion{ @@ -178,22 +191,35 @@ var ( V0_10_1_1, V0_10_2_0, V0_10_2_1, + V0_10_2_2, V0_11_0_0, V0_11_0_1, V0_11_0_2, V1_0_0_0, + V1_0_1_0, + V1_0_2_0, V1_1_0_0, V1_1_1_0, V2_0_0_0, V2_0_1_0, V2_1_0_0, + V2_1_1_0, V2_2_0_0, + V2_2_1_0, + V2_2_2_0, V2_3_0_0, + V2_3_1_0, V2_4_0_0, + V2_4_1_0, V2_5_0_0, + V2_5_1_0, V2_6_0_0, + V2_6_1_0, + V2_6_2_0, V2_7_0_0, + V2_7_1_0, V2_8_0_0, + V2_8_1_0, V3_0_0_0, } MinVersion = V0_8_2_0 diff --git a/utils_test.go b/utils_test.go index 013620e55..62bb7c44e 100644 --- a/utils_test.go +++ b/utils_test.go @@ -18,10 +18,58 @@ func TestVersionCompare(t *testing.T) { if V0_8_2_1.IsAtLeast(V0_10_0_0) { t.Error("0.8.2.1 >= 0.10.0.0") } + if !V1_0_0_0.IsAtLeast(V0_9_0_0) { + t.Error("! 1.0.0.0 >= 0.9.0.0") + } + if V0_9_0_0.IsAtLeast(V1_0_0_0) { + t.Error("0.9.0.0 >= 1.0.0.0") + } } func TestVersionParsing(t *testing.T) { - validVersions := []string{"0.8.2.0", "0.8.2.1", "0.9.0.0", "0.10.2.0", "1.0.0"} + validVersions := []string{ + "0.8.2.0", + "0.8.2.1", + "0.8.2.2", + "0.9.0.0", + "0.9.0.1", + "0.10.0.0", + "0.10.0.1", + "0.10.1.0", + "0.10.1.1", + "0.10.2.0", + "0.10.2.1", + "0.10.2.2", + "0.11.0.0", + "0.11.0.1", + "0.11.0.2", + "1.0.0", + "1.0.1", + "1.0.2", + "1.1.0", + "1.1.1", + "2.0.0", + "2.0.1", + "2.1.0", + "2.1.1", + "2.2.0", + "2.2.1", + "2.2.2", + "2.3.0", + "2.3.1", + "2.4.0", + "2.4.1", + "2.5.0", + "2.5.1", + "2.6.0", + "2.6.1", + "2.6.2", + "2.7.0", + "2.7.1", + "2.8.0", + "2.8.1", + "3.0.0", + } for _, s := range validVersions { v, err := ParseKafkaVersion(s) if err != nil { From aa7ce8517c409b882ffefc07140fdfd0a64150ff Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 22 Sep 2021 13:31:47 +0100 Subject: [PATCH 2/4] fix: prevent data race on conf.Version Move conf.Version check out of background goroutine to prevent read+write race in test. Wrap TestClientController subtests in t.Run(...) whilst in here. --- broker.go | 4 +++- client_test.go | 48 ++++++++++++++++++++++++++---------------------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/broker.go b/broker.go index a67a8bb56..ed92a669d 100644 --- a/broker.go +++ b/broker.go @@ -150,6 +150,8 @@ func (b *Broker) Open(conf *Config) error { return err } + usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest + b.lock.Lock() go withRecover(func() { @@ -159,7 +161,7 @@ func (b *Broker) Open(conf *Config) error { // Send an ApiVersionsRequest to identify the client (KIP-511). // Ideally Sarama would use the response to control protocol versions, // but for now just fire-and-forget just to send - if conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest { + if usingApiVersionsRequests { _, err = b.ApiVersions(&ApiVersionsRequest{ Version: 3, ClientSoftwareName: defaultClientSoftwareName, diff --git a/client_test.go b/client_test.go index d54230881..8cb29d5af 100644 --- a/client_test.go +++ b/client_test.go @@ -687,30 +687,34 @@ func TestClientController(t *testing.T) { cfg := NewTestConfig() // test kafka version greater than 0.10.0.0 - cfg.Version = V0_10_0_0 - client1, err := NewClient([]string{seedBroker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - defer safeClose(t, client1) - broker, err := client1.Controller() - if err != nil { - t.Fatal(err) - } - if broker.Addr() != controllerBroker.Addr() { - t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr()) - } + t.Run("V0_10_0_0", func(t *testing.T) { + cfg.Version = V0_10_0_0 + client1, err := NewClient([]string{seedBroker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, client1) + broker, err := client1.Controller() + if err != nil { + t.Fatal(err) + } + if broker.Addr() != controllerBroker.Addr() { + t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr()) + } + }) // test kafka version earlier than 0.10.0.0 - cfg.Version = V0_9_0_1 - client2, err := NewClient([]string{seedBroker.Addr()}, cfg) - if err != nil { - t.Fatal(err) - } - defer safeClose(t, client2) - if _, err = client2.Controller(); err != ErrUnsupportedVersion { - t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err) - } + t.Run("V0_9_0_1", func(t *testing.T) { + cfg.Version = V0_9_0_1 + client2, err := NewClient([]string{seedBroker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, client2) + if _, err = client2.Controller(); err != ErrUnsupportedVersion { + t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err) + } + }) } func TestClientMetadataTimeout(t *testing.T) { From 01c7c43010fd8dc00778f2741a328fd63c659a9d Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 22 Sep 2021 14:14:40 +0100 Subject: [PATCH 3/4] chore: bump up the test timeout to 10m We seem to be hitting `*** Test killed: ran too long (7m0s)` for the functional test(s), and whilst I do see a large number of hung goroutines in the backtrace, I can also see that it is mid teardown of the docker containers, so it seems to have finished the actual testing and might just be taking longer than usual to `docker stop` the kafka containers. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 4714d7798..8f8fc6bdb 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ default: fmt get update test lint GO := go GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 6m -coverprofile=profile.out -covermode=atomic +GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 10m -coverprofile=profile.out -covermode=atomic FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go') TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') From 8a2c8bae2c5f49d915fdb8121c3ad73e83c0938a Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 22 Sep 2021 14:46:16 +0100 Subject: [PATCH 4/4] fix(test): use copy in mockbroker SetHandlersByMap Clone the given handlerMap (to prevent race conditions on modification), and require further calls to SetHandlerByMap if the handlers need updating mid test. --- admin_test.go | 3 +++ mockbroker.go | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/admin_test.go b/admin_test.go index 791b30cd4..f6e06fd62 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1357,6 +1357,7 @@ func TestDeleteOffset(t *testing.T) { // Test NoError handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrNoError) + seedBroker.SetHandlerByMap(handlerMap) err = admin.DeleteConsumerGroupOffset(group, topic, partition) if err != nil { t.Fatalf("DeleteConsumerGroupOffset failed with error %v", err) @@ -1364,6 +1365,7 @@ func TestDeleteOffset(t *testing.T) { // Test Error handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNotCoordinatorForConsumer, topic, partition, ErrNoError) + seedBroker.SetHandlerByMap(handlerMap) err = admin.DeleteConsumerGroupOffset(group, topic, partition) if err != ErrNotCoordinatorForConsumer { t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrNotCoordinatorForConsumer) @@ -1371,6 +1373,7 @@ func TestDeleteOffset(t *testing.T) { // Test Error for partition handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNoError, topic, partition, ErrGroupSubscribedToTopic) + seedBroker.SetHandlerByMap(handlerMap) err = admin.DeleteConsumerGroupOffset(group, topic, partition) if err != ErrGroupSubscribedToTopic { t.Fatalf("DeleteConsumerGroupOffset should have failed with error %v", ErrGroupSubscribedToTopic) diff --git a/mockbroker.go b/mockbroker.go index f064c45b3..9166f6efb 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -83,9 +83,13 @@ func (b *MockBroker) SetLatency(latency time.Duration) { // and uses the found MockResponse instance to generate an appropriate reply. // If the request type is not found in the map then nothing is sent. func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { + fnMap := make(map[string]MockResponse) + for k, v := range handlerMap { + fnMap[k] = v + } b.setHandler(func(req *request) (res encoderWithHeader) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() - mockResponse := handlerMap[reqTypeName] + mockResponse := fnMap[reqTypeName] if mockResponse == nil { return nil }