-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not use partition cache for unknown topics #372
Conversation
831909c
to
1024968
Compare
Added a producer test as well. |
partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions) | ||
client.cachedPartitionsResults[topic.Name] = partitionCache | ||
|
||
if topic.Err != ErrUnknownTopicOrPartition && topic.Err != ErrInvalidTopic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't delete the old cache in the case that a topic we already knew about was deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I prefer to just unconditionally delete the cache at the top of the loop for all partitions, and let it get filled in again for those we care about
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional thought is that we still create the client.metadata
map entry even if the cache is gone, which could be a little confusing (that the "real" data is []
but the cache is nil
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache is deleted on line 567
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah whoops, I'm dumb, sorry.
I believe we may want to similarly delete from client.metadata
and guard that creation though, still trying to figure that out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think something like the following pseudocode:
for topics {
delete(client.metadata, ...)
delete(client.cachedPartitionResults, ...)
switch {
NoError: break
UnknownTopic: set retry, continue
LeaderNotAvailable: set retry, break
InvalidTopic: set err, continue
default: set err, break
}
// fill in client.metadata and partitionCache unconditionally
}
Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue is that we don't cache the error value for cachedPartitions()
, but we do for cachedLeader()
. cachedMetadata()
doesn't either, but the struct that it returns contains the Err
value.
Don't forget to put this down as a bugfix in the changelog :) |
62a9151
to
5d4f0f8
Compare
Updated. |
switch topic.Err { | ||
case ErrNoError: | ||
break | ||
case ErrLeaderNotAvailable, ErrUnknownTopicOrPartition: | ||
case ErrInvalidTopic: // don't retry, don't store partial results | ||
err = topic.Err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes this method return ErrInvalidTopic
, even if all the other topics are OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which is fine, I think - we only get this if you explicitly pass in "SomeInvalidTopic\" to RefreshMetadata
, so returning ErrInvalidTopic
in that case is correct, even if not all topics were invalid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case we should probably also set the error on ErrUnknownTopicOrPartition
, so that gets returned to the user as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I thought we checked the error return before the retry return, but we do not, so this is OK, good catch 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
5d4f0f8
to
7f2cdf6
Compare
} | ||
|
||
if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic { | ||
t.Log("Expected ErrInvalidTopic, found", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these be t.Error
instead of t.Log
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is failing on the 0.8.1 CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is that why they're logs, I think you're ahead of me :)
Interestingly, when using an invalid topic name, Kafka returns |
Anyway, if CI is happy am I happy now |
The checks must be in a funny order. And of course pre-0.8.2 it just returns Unknown regardless. CI is green, ship it. |
Do not use partition cache for unknown topics
If we set the partition cache, we end up return an empty list later on, which ends up causing a weird error message later on.
@Shopify/kafka for review