Skip to content

Commit

Permalink
Add some tests on how the client should handle errors in the metadata…
Browse files Browse the repository at this point in the history
… response.
  • Loading branch information
wvanbergen committed Mar 20, 2015
1 parent 6dff870 commit 6f6b8e7
Showing 1 changed file with 94 additions and 0 deletions.
94 changes: 94 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,100 @@ func TestClientMetadata(t *testing.T) {
safeClose(t, client)
}

func TestClientReceivingUnknownTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

config := NewConfig()
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddTopic("new_topic", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
t.Error("ErrUnknownTopicOrPartition expected, got", err)
}

// If we are asking for the leader of a partition of the non-existing topic.
// we will request metadata again.

metadataResponse3 := new(MetadataResponse)
metadataResponse3.AddTopic("new_topic", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse3)

if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, got", err)
}

safeClose(t, client)
seedBroker.Close()
}

func TestClientReceivingPartialMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
seedBroker.Returns(metadataResponse1)

config := NewConfig()
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddTopic("new_topic", ErrLeaderNotAvailable)
metadataResponse2.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
metadataResponse2.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata("new_topic"); err != nil {
t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
}

// Even though the metadata was incomplete, we should be able to get the leader of a partition
// for which we did get a useful response, without doing additional requests.

partition0Leader, err := client.Leader("new_topic", 0)
if err != nil {
t.Error(err)
} else if partition0Leader.Addr() != leader.Addr() {
t.Error("Unexpected leader returned", partition0Leader.Addr())
}

// If we are asking for the leader of a partition that didn't have a leader before,
// we will do another metadata request.

metadataResponse3 := new(MetadataResponse)
metadataResponse3.AddTopic("new_topic", ErrLeaderNotAvailable)
metadataResponse3.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
metadataResponse3.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
seedBroker.Returns(metadataResponse3)

// Still no leader for the partition, so asking for it should return an error.
_, err = client.Leader("new_topic", 1)
if err != ErrLeaderNotAvailable {
t.Error("Expected ErrLeaderNotAvailable, got", err)
}

safeClose(t, client)
seedBroker.Close()
leader.Close()
}

func TestClientRefreshBehaviour(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
Expand Down

0 comments on commit 6f6b8e7

Please sign in to comment.