From 6f6b8e7aaad13647affd2eab0fa245c497773e34 Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Fri, 20 Mar 2015 12:23:41 -0400 Subject: [PATCH] Add some tests on how the client should handle errors in the metadata response. --- client_test.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/client_test.go b/client_test.go index e778842df4..97df324277 100644 --- a/client_test.go +++ b/client_test.go @@ -199,6 +199,100 @@ func TestClientMetadata(t *testing.T) { safeClose(t, client) } +func TestClientReceivingUnknownTopic(t *testing.T) { + seedBroker := newMockBroker(t, 1) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse1) + + config := NewConfig() + config.Metadata.Retry.Max = 0 + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddTopic("new_topic", ErrUnknownTopicOrPartition) + seedBroker.Returns(metadataResponse2) + + if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition { + t.Error("ErrUnknownTopicOrPartition expected, got", err) + } + + // If we are asking for the leader of a partition of the non-existing topic. + // we will request metadata again. + + metadataResponse3 := new(MetadataResponse) + metadataResponse3.AddTopic("new_topic", ErrUnknownTopicOrPartition) + seedBroker.Returns(metadataResponse3) + + if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition { + t.Error("Expected ErrUnknownTopicOrPartition, got", err) + } + + safeClose(t, client) + seedBroker.Close() +} + +func TestClientReceivingPartialMetadata(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 5) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) + seedBroker.Returns(metadataResponse1) + + config := NewConfig() + config.Metadata.Retry.Max = 0 + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} + + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddTopic("new_topic", ErrLeaderNotAvailable) + metadataResponse2.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError) + metadataResponse2.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable) + seedBroker.Returns(metadataResponse2) + + if err := client.RefreshMetadata("new_topic"); err != nil { + t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error") + } + + // Even though the metadata was incomplete, we should be able to get the leader of a partition + // for which we did get a useful response, without doing additional requests. + + partition0Leader, err := client.Leader("new_topic", 0) + if err != nil { + t.Error(err) + } else if partition0Leader.Addr() != leader.Addr() { + t.Error("Unexpected leader returned", partition0Leader.Addr()) + } + + // If we are asking for the leader of a partition that didn't have a leader before, + // we will do another metadata request. + + metadataResponse3 := new(MetadataResponse) + metadataResponse3.AddTopic("new_topic", ErrLeaderNotAvailable) + metadataResponse3.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError) + metadataResponse3.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable) + seedBroker.Returns(metadataResponse3) + + // Still no leader for the partition, so asking for it should return an error. + _, err = client.Leader("new_topic", 1) + if err != ErrLeaderNotAvailable { + t.Error("Expected ErrLeaderNotAvailable, got", err) + } + + safeClose(t, client) + seedBroker.Close() + leader.Close() +} + func TestClientRefreshBehaviour(t *testing.T) { seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 5)