From 98ec384372ecbb9d86036c6f210f840c45dbfa70 Mon Sep 17 00:00:00 2001 From: Hao Sun Date: Wed, 6 Sep 2023 20:04:50 -0700 Subject: [PATCH] fix: use least loaded broker to refresh metadata Seed brokers never change after client initialization. If the first seed broker became stale (still online, but moved to other Kafka cluster), Sarama client may use this stale broker to get the wrong metadata. To avoid using the stale broker to do metadata refresh, we will choose the least loaded broker in the cached broker list which is the similar to how the Java client implementation works: https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736 Contributes-to: IBM/sarama#2637 Signed-off-by: Hao Sun --- admin_test.go | 12 +++-- async_producer_test.go | 69 +++++++++++++++----------- client.go | 58 ++++++++-------------- client_test.go | 110 +++++++++++++++++++++++++++++++---------- consumer_test.go | 60 ++++++++++++---------- functional_test.go | 2 +- mockbroker.go | 14 ++++++ offset_manager_test.go | 26 +++++----- sync_producer_test.go | 2 +- 9 files changed, 216 insertions(+), 137 deletions(-) diff --git a/admin_test.go b/admin_test.go index 3387a7270..2b70aa9bb 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1712,11 +1712,15 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { seedBroker1.BrokerID(), b.ID()) } + metadataResponse := NewMockMetadataResponse(t). + SetController(seedBroker2.BrokerID()). + SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). + SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()) seedBroker1.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetController(seedBroker2.BrokerID()). - SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). - SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), + "MetadataRequest": metadataResponse, + }) + seedBroker2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse, }) if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() { diff --git a/async_producer_test.go b/async_producer_test.go index b3eebe099..c192235cc 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { // When: a broker connection gets reset by a broker (network glitch, restart, you name it). leader.Close() // producer should get EOF leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles - seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again + leader.Returns(metadataResponse) // tell it to go to broker 2 again // Then: a produced message goes through the new broker connection. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} @@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) leader2.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader2.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader1.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) @@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) leader2.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader2.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader1.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 1, 0) @@ -739,16 +739,17 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { leader := NewMockBroker(t, 2) var leaderLock sync.Mutex - - // The seed broker only handles Metadata request - seedBroker.setHandler(func(req *request) (res encoderWithHeader) { + metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) { leaderLock.Lock() defer leaderLock.Unlock() metadataLeader := new(MetadataResponse) metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) return metadataLeader - }) + } + + // The seed broker only handles Metadata request in bootstrap + seedBroker.setHandler(metadataRequestHandlerFunc) var emptyValues int32 = 0 @@ -770,7 +771,7 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { } } - leader.setHandler(func(req *request) (res encoderWithHeader) { + failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { countRecordsWithEmptyValue(req) time.Sleep(50 * time.Millisecond) @@ -778,6 +779,19 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) return prodSuccess + } + + succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { + countRecordsWithEmptyValue(req) + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + return prodSuccess + } + + leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "ProduceRequest": failedProduceRequestHandlerFunc, + "MetadataRequest": metadataRequestHandlerFunc, }) config := NewTestConfig() @@ -816,12 +830,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { leaderLock.Lock() leader = NewMockBroker(t, 2) leaderLock.Unlock() - leader.setHandler(func(req *request) (res encoderWithHeader) { - countRecordsWithEmptyValue(req) - - prodSuccess := new(ProduceResponse) - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - return prodSuccess + leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "ProduceRequest": succeededProduceRequestHandlerFunc, + "MetadataRequest": metadataRequestHandlerFunc, }) wg.Wait() @@ -938,7 +949,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} // tell partition 0 to go to that broker again - seedBroker.Returns(metadataResponse) + leader.Returns(metadataResponse) // succeed this time prodSuccess = new(ProduceResponse) @@ -994,14 +1005,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { time.Sleep(50 * time.Millisecond) - leader.SetHandlerByMap(map[string]MockResponse{ - "ProduceRequest": NewMockProduceResponse(t). - SetVersion(0). - SetError("my_topic", 0, ErrNoError), - }) - // tell partition 0 to go to that broker again - seedBroker.Returns(metadataResponse) + leader.Returns(metadataResponse) + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) // succeed this time expectResults(t, producer, 5, 0) @@ -1010,6 +1018,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } + prodSuccess = new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) expectResults(t, producer, 5, 0) // shutdown @@ -1051,7 +1062,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader) + leader.Returns(metadataLeader) prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) diff --git a/client.go b/client.go index bbba58567..5e665eaef 100644 --- a/client.go +++ b/client.go @@ -260,7 +260,7 @@ func (client *client) Broker(brokerID int32) (*Broker, error) { func (client *client) InitProducerID() (*InitProducerIDResponse, error) { // FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go? brokerErrors := make([]error, 0) - for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() { + for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() { request := &InitProducerIDRequest{} if client.conf.Version.IsAtLeast(V2_7_0_0) { @@ -763,22 +763,21 @@ func (client *client) registerBroker(broker *Broker) { } } -// deregisterBroker removes a broker from the seedsBroker list, and if it's -// not the seedbroker, removes it from brokers map completely. +// deregisterBroker removes a broker from the broker list, and if it's +// not in the broker list, removes it from seedBrokers. func (client *client) deregisterBroker(broker *Broker) { client.lock.Lock() defer client.lock.Unlock() + _, ok := client.brokers[broker.ID()] + if ok { + Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) + delete(client.brokers, broker.ID()) + return + } if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] { client.deadSeeds = append(client.deadSeeds, broker) client.seedBrokers = client.seedBrokers[1:] - } else { - // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever, - // but we really shouldn't have to; once that loop is made better this case can be - // removed, and the function generally can be renamed from `deregisterBroker` to - // `nextSeedBroker` or something - DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) - delete(client.brokers, broker.ID()) } } @@ -791,33 +790,12 @@ func (client *client) resurrectDeadBrokers() { client.deadSeeds = nil } -func (client *client) anyBroker() *Broker { - client.lock.RLock() - defer client.lock.RUnlock() - - if len(client.seedBrokers) > 0 { - _ = client.seedBrokers[0].Open(client.conf) - return client.seedBrokers[0] - } - - // not guaranteed to be random *or* deterministic - for _, broker := range client.brokers { - _ = broker.Open(client.conf) - return broker - } - - return nil -} - +// LeastLoadedBroker returns the broker with the least pending requests. +// Firstly, choose the broker from cached broker list. If the broker list is empty, choose from seed brokers. func (client *client) LeastLoadedBroker() *Broker { client.lock.RLock() defer client.lock.RUnlock() - if len(client.seedBrokers) > 0 { - _ = client.seedBrokers[0].Open(client.conf) - return client.seedBrokers[0] - } - var leastLoadedBroker *Broker pendingRequests := math.MaxInt for _, broker := range client.brokers { @@ -826,10 +804,16 @@ func (client *client) LeastLoadedBroker() *Broker { leastLoadedBroker = broker } } - if leastLoadedBroker != nil { _ = leastLoadedBroker.Open(client.conf) + return leastLoadedBroker } + + if len(client.seedBrokers) > 0 { + _ = client.seedBrokers[0].Open(client.conf) + return client.seedBrokers[0] + } + return leastLoadedBroker } @@ -1032,9 +1016,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, return err } - broker := client.anyBroker() + broker := client.LeastLoadedBroker() brokerErrors := make([]error, 0) - for ; broker != nil && !pastDeadline(0); broker = client.anyBroker() { + for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() { allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation if len(topics) > 0 { DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) @@ -1212,7 +1196,7 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo } brokerErrors := make([]error, 0) - for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() { + for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() { DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr()) request := new(FindCoordinatorRequest) diff --git a/client_test.go b/client_test.go index c9d5b56ad..78243bce0 100644 --- a/client_test.go +++ b/client_test.go @@ -334,11 +334,11 @@ func TestClientGetOffset(t *testing.T) { } leader.Close() - seedBroker.Returns(metadata) leader = NewMockBrokerAddr(t, 2, leaderAddr) offsetResponse = new(OffsetResponse) offsetResponse.AddTopicPartition("foo", 0, 456) + leader.Returns(metadata) leader.Returns(offsetResponse) offset, err = client.GetOffset("foo", 0, OffsetNewest) @@ -445,12 +445,11 @@ func TestClientReceivingPartialMetadata(t *testing.T) { replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} metadataPartial := new(MetadataResponse) - metadataPartial.AddBroker(seedBroker.Addr(), 1) metadataPartial.AddBroker(leader.Addr(), 5) metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable) metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError) metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable) - seedBroker.Returns(metadataPartial) + leader.Returns(metadataPartial) if err := client.RefreshMetadata("new_topic"); err != nil { t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error") @@ -469,7 +468,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { // If we are asking for the leader of a partition that didn't have a leader before, // we will do another metadata request. - seedBroker.Returns(metadataPartial) + leader.Returns(metadataPartial) // Still no leader for the partition, so asking for it should return an error. _, err = client.Leader("new_topic", 1) @@ -493,7 +492,7 @@ func TestClientRefreshBehaviour(t *testing.T) { metadataResponse2 := new(MetadataResponse) metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse2) + leader.Returns(metadataResponse2) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -573,9 +572,13 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { t.Error("Meta broker is not 2") } - metadataResponse2 := new(MetadataResponse) - metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) - seedBroker.Returns(metadataResponse2) + metadataResponse2 := NewMockMetadataResponse(t).SetBroker(leader.Addr(), leader.BrokerID()) + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) if err := client.RefreshMetadata(); err != nil { t.Error(err) @@ -611,9 +614,13 @@ func TestClientGetBroker(t *testing.T) { t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr()) } - metadataResponse2 := new(MetadataResponse) - metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) - seedBroker.Returns(metadataResponse2) + metadataResponse2 := NewMockMetadataResponse(t).SetBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) if err := client.RefreshMetadata(); err != nil { t.Error(err) @@ -856,13 +863,11 @@ func TestClientUpdateMetadataErrorAndRetry(t *testing.T) { func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) - staleCoordinator := NewMockBroker(t, 2) - freshCoordinator := NewMockBroker(t, 3) + coordinator := NewMockBroker(t, 2) - replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} + replicas := []int32{coordinator.BrokerID()} metadataResponse1 := new(MetadataResponse) - metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) - metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) + metadataResponse1.AddBroker(coordinator.Addr(), coordinator.BrokerID()) metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse1) @@ -873,20 +878,72 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { coordinatorResponse1 := new(ConsumerMetadataResponse) coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable - seedBroker.Returns(coordinatorResponse1) + coordinator.Returns(coordinatorResponse1) coordinatorResponse2 := new(ConsumerMetadataResponse) - coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID() + coordinatorResponse2.CoordinatorID = coordinator.BrokerID() coordinatorResponse2.CoordinatorHost = "127.0.0.1" - coordinatorResponse2.CoordinatorPort = staleCoordinator.Port() + coordinatorResponse2.CoordinatorPort = coordinator.Port() - seedBroker.Returns(coordinatorResponse2) + coordinator.Returns(coordinatorResponse2) broker, err := client.Coordinator("my_group") if err != nil { t.Error(err) } + if coordinator.Addr() != broker.Addr() { + t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr()) + } + + if coordinator.BrokerID() != broker.ID() { + t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID()) + } + + // Grab the cached value + broker2, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if broker2.Addr() != broker.Addr() { + t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) + } + + coordinator.Close() + seedBroker.Close() + safeClose(t, client) +} + +func TestClientCoordinatorChangeWithConsumerOffsetsTopic(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + staleCoordinator := NewMockBroker(t, 2) + freshCoordinator := NewMockBroker(t, 3) + + replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) + metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) + metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) + seedBroker.Returns(metadataResponse1) + + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + + findCoordinatorResponse := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", staleCoordinator) + staleCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse, + }) + freshCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse, + }) + broker, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + if staleCoordinator.Addr() != broker.Addr() { t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr()) } @@ -905,12 +962,13 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) } - coordinatorResponse3 := new(ConsumerMetadataResponse) - coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID() - coordinatorResponse3.CoordinatorHost = "127.0.0.1" - coordinatorResponse3.CoordinatorPort = freshCoordinator.Port() - - seedBroker.Returns(coordinatorResponse3) + findCoordinatorResponse2 := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", freshCoordinator) + staleCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse2, + }) + freshCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse2, + }) // Refresh the locally cached value because it's stale if err := client.RefreshCoordinator("my_group"); err != nil { diff --git a/consumer_test.go b/consumer_test.go index 4096bdd73..126cef6c3 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1142,6 +1142,10 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { SetMessage("my_topic", 0, 2, testMsg), }) + leader2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse1, + }) + client, err := NewClient([]string{leader1.Addr()}, cfg) if err != nil { t.Fatal(err) @@ -1362,21 +1366,23 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { seedBroker will give leader1 as serving my_topic/0 now * my_topic/1 -> leader1 will serve 0 messages`) - // seed broker tells that the new partition 0 leader is leader1 - seedBroker.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetLeader("my_topic", 0, leader1.BrokerID()). - SetLeader("my_topic", 1, leader1.BrokerID()). - SetBroker(leader0.Addr(), leader0.BrokerID()). - SetBroker(leader1.Addr(), leader1.BrokerID()). - SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), - }) - // leader0 says no longer leader of partition 0 fetchResponse := new(FetchResponse) fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) + metadataResponse := NewMockMetadataResponse(t). + SetLeader("my_topic", 0, leader1.BrokerID()). + SetLeader("my_topic", 1, leader1.BrokerID()). + SetBroker(leader0.Addr(), leader0.BrokerID()). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()) + leader0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": NewMockWrapper(fetchResponse), + "FetchRequest": NewMockWrapper(fetchResponse), + "MetadataRequest": metadataResponse, + }) + leader1.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": NewMockFetchResponse(t, 1), + "MetadataRequest": metadataResponse, }) time.Sleep(50 * time.Millisecond) @@ -1393,7 +1399,8 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg) } leader1.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": mockFetchResponse2, + "FetchRequest": mockFetchResponse2, + "MetadataRequest": metadataResponse, }) for i := 0; i < 8; i++ { @@ -1409,6 +1416,12 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { * my_topic/1 -> leader1 will return NotLeaderForPartition seedBroker will give leader0 as serving my_topic/1 now`) + metadataResponse2 := NewMockMetadataResponse(t). + SetLeader("my_topic", 0, leader1.BrokerID()). + SetLeader("my_topic", 1, leader0.BrokerID()). + SetBroker(leader0.Addr(), leader0.BrokerID()). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()) leader0.SetHandlerByMap(map[string]MockResponse{ "FetchRequest": NewMockFetchResponse(t, 1), }) @@ -1416,16 +1429,6 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { "FetchRequest": NewMockFetchResponse(t, 1), }) - // metadata assigns 0 to leader1 and 1 to leader0 - seedBroker.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetLeader("my_topic", 0, leader1.BrokerID()). - SetLeader("my_topic", 1, leader0.BrokerID()). - SetBroker(leader0.Addr(), leader0.BrokerID()). - SetBroker(leader1.Addr(), leader1.BrokerID()). - SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), - }) - // leader1 provides three more messages on partition0, says no longer leader of partition1 mockFetchResponse3 := NewMockFetchResponse(t, 3). SetMessage("my_topic", 0, int64(7), testMsg). @@ -1435,7 +1438,12 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { fetchResponse4.AddError("my_topic", 0, ErrNoError) fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) leader1.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), + "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), + "MetadataRequest": metadataResponse2, + }) + leader0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": NewMockFetchResponse(t, 1), + "MetadataRequest": metadataResponse2, }) t.Log(` STAGE 5: @@ -1448,7 +1456,8 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg) } leader0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": mockFetchResponse4, + "FetchRequest": mockFetchResponse4, + "MetadataRequest": metadataResponse2, }) for i := 7; i < 10; i++ { @@ -1593,7 +1602,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { // Bring broker0 back to service. broker0 = NewMockBrokerAddr(t, 0, broker0Addr) broker0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": mockFetchResponse, + "MetadataRequest": mockMetadataResponse, + "FetchRequest": mockFetchResponse, }) // Read the rest of messages from both partitions. diff --git a/functional_test.go b/functional_test.go index 3607fd145..fd953ec7c 100644 --- a/functional_test.go +++ b/functional_test.go @@ -228,7 +228,7 @@ mainLoop: } for _, broker := range brokers { err := broker.Open(client.Config()) - if err != nil { + if err != nil && !errors.Is(err, ErrAlreadyConnected) { client.Close() continue retryLoop } diff --git a/mockbroker.go b/mockbroker.go index 8b73074fb..6e5d90608 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -98,6 +98,20 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { }) } +// SetHandlerFuncByMap defines mapping of Request types to RequestHandlerFunc. When a +// request is received by the broker, it looks up the request type in the map +// and invoke the found RequestHandlerFunc instance to generate an appropriate reply. +func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc) { + fnMap := make(map[string]requestHandlerFunc) + for k, v := range handlerMap { + fnMap[k] = v + } + b.setHandler(func(req *request) (res encoderWithHeader) { + reqTypeName := reflect.TypeOf(req.body).Elem().Name() + return fnMap[reqTypeName](req) + }) +} + // SetNotifier set a function that will get invoked whenever a request has been // processed successfully and will provide the number of bytes read and written func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) { diff --git a/offset_manager_test.go b/offset_manager_test.go index af95bc9e4..c3ac33641 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -37,7 +37,7 @@ func initOffsetManagerWithBackoffFunc( t.Fatal(err) } - broker.Returns(&ConsumerMetadataResponse{ + coordinator.Returns(&ConsumerMetadataResponse{ CoordinatorID: coordinator.BrokerID(), CoordinatorHost: "127.0.0.1", CoordinatorPort: coordinator.Port(), @@ -251,7 +251,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { // Refresh coordinator newCoordinator := NewMockBroker(t, 3) defer newCoordinator.Close() - broker.Returns(&ConsumerMetadataResponse{ + coordinator.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", CoordinatorPort: newCoordinator.Port(), @@ -492,36 +492,34 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { ocResponse.AddError("my_topic", 1, ErrNoError) coordinator.Returns(ocResponse) - newCoordinator := NewMockBroker(t, 3) - defer newCoordinator.Close() - // For RefreshCoordinator() - broker.Returns(&ConsumerMetadataResponse{ - CoordinatorID: newCoordinator.BrokerID(), + coordinator.Returns(&ConsumerMetadataResponse{ + CoordinatorID: coordinator.BrokerID(), CoordinatorHost: "127.0.0.1", - CoordinatorPort: newCoordinator.Port(), + CoordinatorPort: coordinator.Port(), }) // Nothing in response.Errors at all ocResponse2 := new(OffsetCommitResponse) - newCoordinator.Returns(ocResponse2) + coordinator.Returns(ocResponse2) // No error, no need to refresh coordinator // Error on the wrong partition for this pom ocResponse3 := new(OffsetCommitResponse) ocResponse3.AddError("my_topic", 1, ErrNoError) - newCoordinator.Returns(ocResponse3) - - // No error, no need to refresh coordinator + coordinator.Returns(ocResponse3) // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block ocResponse4 := new(OffsetCommitResponse) ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition) - newCoordinator.Returns(ocResponse4) + coordinator.Returns(ocResponse4) + + newCoordinator := NewMockBroker(t, 3) + defer newCoordinator.Close() // For RefreshCoordinator() - broker.Returns(&ConsumerMetadataResponse{ + coordinator.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", CoordinatorPort: newCoordinator.Port(), diff --git a/sync_producer_test.go b/sync_producer_test.go index d55444858..8d366b011 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -89,7 +89,7 @@ func TestSyncProducerTransactional(t *testing.T) { findCoordinatorResponse := new(FindCoordinatorResponse) findCoordinatorResponse.Coordinator = client.Brokers()[0] findCoordinatorResponse.Version = 1 - seedBroker.Returns(findCoordinatorResponse) + leader.Returns(findCoordinatorResponse) initProducerIdResponse := new(InitProducerIDResponse) leader.Returns(initProducerIdResponse)