Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not use partition cache for unknown topics #372

Merged
merged 2 commits into from
Mar 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Bug Fixes:
([#369](https://github.com/Shopify/sarama/pull/369)).
- Fix a condition where the producer's internal control messages could have
gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
- Fix an issue where invalid partition lists would be cached when asking for
metadata for a non-existing topic ([#372](https://github.com/Shopify/sarama/pull/372)).


#### Version 1.0.0 (2015-03-17)

Expand Down
24 changes: 19 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
switch err.(type) {
case nil:
// valid response, use it
retry, err := client.update(response)
retry, err := client.updateMetadata(response)

if len(retry) > 0 {
if retriesRemaining <= 0 {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) update(data *MetadataResponse) ([]string, error) {
func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()

Expand All @@ -554,23 +554,37 @@ func (client *client) update(data *MetadataResponse) ([]string, error) {

var err error
for _, topic := range data.Topics {

delete(client.metadata, topic.Name)
delete(client.cachedPartitionsResults, topic.Name)

switch topic.Err {
case ErrNoError:
break
case ErrLeaderNotAvailable, ErrUnknownTopicOrPartition:
case ErrInvalidTopic: // don't retry, don't store partial results
err = topic.Err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes this method return ErrInvalidTopic, even if all the other topics are OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is fine, I think - we only get this if you explicitly pass in "SomeInvalidTopic\" to RefreshMetadata, so returning ErrInvalidTopic in that case is correct, even if not all topics were invalid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we should probably also set the error on ErrUnknownTopicOrPartition, so that gets returned to the user as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought we checked the error return before the retry return, but we do not, so this is OK, good catch 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

continue
case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
err = topic.Err
toRetry[topic.Name] = true
default:
continue
case ErrLeaderNotAvailable: // retry, but store partiial partition results
toRetry[topic.Name] = true
break
default: // don't retry, don't store partial results
Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
err = topic.Err
continue
}

client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
delete(client.cachedPartitionsResults, topic.Name)
for _, partition := range topic.Partitions {
client.metadata[topic.Name][partition.ID] = partition
if partition.Err == ErrLeaderNotAvailable {
toRetry[topic.Name] = true
}
}

var partitionCache [maxPartitionIndex][]int32
partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
Expand Down
54 changes: 54 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,60 @@ func TestCachedPartitions(t *testing.T) {
safeClose(t, client)
}

func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
seedBroker := newMockBroker(t, 1)

replicas := []int32{seedBroker.BrokerID()}

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError)
seedBroker.Returns(metadataResponse)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this response can just be empty, there's no need for these

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see you actually test this later, nevermind

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broker is still not necessary in any of these responses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thouhgt it was a good idea to ensure we are now not always busting the cache.


config := NewConfig()
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse)

partitions, err := client.Partitions("unknown")

if err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}
if partitions != nil {
t.Errorf("Should return nil as partition list, found %v", partitions)
}

// Should still use the cache of a known topic
partitions, err = client.Partitions("my_topic")
if err != nil {
t.Errorf("Expected no error, found %v", err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataResponse)

// Should not use cache for unknown topic
partitions, err = client.Partitions("unknown")
if err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}
if partitions != nil {
t.Errorf("Should return nil as partition list, found %v", partitions)
}

seedBroker.Close()
safeClose(t, client)
}

func TestClientSeedBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)

Expand Down
61 changes: 61 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,48 @@ func TestFuncConnectionFailure(t *testing.T) {
}
}

func TestFuncClientMetadata(t *testing.T) {
checkKafkaAvailability(t)

config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 10 * time.Millisecond
client, err := NewClient(kafkaBrokers, config)
if err != nil {
t.Fatal(err)
}

if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, got", err)
}

if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, got", err)
}

if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, got", err)
}

partitions, err := client.Partitions("multi_partition")
if err != nil {
t.Error(err)
}
if len(partitions) != 2 {
t.Errorf("Expected multi_partition topic to have 2 partitions, found %v", partitions)
}

partitions, err = client.Partitions("single_partition")
if err != nil {
t.Error(err)
}
if len(partitions) != 1 {
t.Errorf("Expected single_partition topic to have 1 partitions, found %v", partitions)
}

safeClose(t, client)
}

func TestFuncProducing(t *testing.T) {
config := NewConfig()
testProducingMessages(t, config)
Expand Down Expand Up @@ -124,6 +166,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
}
}

func TestProducingToInvalidTopic(t *testing.T) {
checkKafkaAvailability(t)

producer, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}

if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
t.Error("Expected ErrUnknownTopicOrPartition, found", err)
}

safeClose(t, producer)
}

func testProducingMessages(t *testing.T, config *Config) {
checkKafkaAvailability(t)

Expand Down
22 changes: 14 additions & 8 deletions metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,31 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
}

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
var match *TopicMetadata
func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding an API requires a minor version bump in semver (1.1.0 vs 1.0.1). I'm torn between just making this private or eating the version bump.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just do the version bump; there's no good reason this method should be hidden if AddTopicPartition is exported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we doc the new API in the changelog then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really feel the need to; it will just show up in the godoc.

var tmatch *TopicMetadata

for _, tm := range m.Topics {
if tm.Name == topic {
match = tm
tmatch = tm
goto foundTopic
}
}

match = new(TopicMetadata)
match.Name = topic
m.Topics = append(m.Topics, match)
tmatch = new(TopicMetadata)
tmatch.Name = topic
m.Topics = append(m.Topics, tmatch)

foundTopic:

tmatch.Err = err
return tmatch
}

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
tmatch := m.AddTopic(topic, ErrNoError)
var pmatch *PartitionMetadata

for _, pm := range match.Partitions {
for _, pm := range tmatch.Partitions {
if pm.ID == partition {
pmatch = pm
goto foundPartition
Expand All @@ -209,7 +215,7 @@ foundTopic:

pmatch = new(PartitionMetadata)
pmatch.ID = partition
match.Partitions = append(match.Partitions, pmatch)
tmatch.Partitions = append(tmatch.Partitions, pmatch)

foundPartition:

Expand Down
30 changes: 30 additions & 0 deletions sync_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,36 @@ func TestConcurrentSyncProducer(t *testing.T) {
seedBroker.Close()
}

func TestSyncProducerToNonExistingTopic(t *testing.T) {
broker := newMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
broker.Returns(metadataResponse)

config := NewConfig()
config.Metadata.Retry.Max = 0
config.Producer.Retry.Max = 0

producer, err := NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataResponse = new(MetadataResponse)
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
broker.Returns(metadataResponse)

_, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
if err != ErrUnknownTopicOrPartition {
t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
}

safeClose(t, producer)
broker.Close()
}

// This example shows the basic usage pattern of the SyncProducer.
func ExampleSyncProducer() {
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
Expand Down
1 change: 1 addition & 0 deletions vagrant/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ num.partitions=2
# more easily.
default.replication.factor=2

auto.create.topics.enable=false
delete.topic.enable=true

############################# Log Flush Policy #############################
Expand Down