-
Notifications
You must be signed in to change notification settings - Fork 337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feat] Allow consumers to refresh the topic lists #818
Conversation
lib/kafka/consumer.rb
Outdated
|
||
# Hash storing topics that are already being subscribed | ||
# When subcribing to a new topic, if it's already being subscribed before, skip it | ||
@subscribed_topics = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use Set
here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dasch I updated my PR based on your recommendation.
lib/kafka/consumer.rb
Outdated
sleep refresh_topic_interval | ||
end | ||
end | ||
@thread.abort_on_exception = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is problematic. @thread
is only set on the first invocation of subscribe
, which can be called multiple times. There should either be a single thread that consults a set of regexes or one thread per regex. Also, the thread or threads need to be killed on consumer shutdown.
lib/kafka/consumer.rb
Outdated
|
||
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition) | ||
if refresh_topic_interval > 0 | ||
@thread ||= Thread.new do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spawning a thread here is error-prone, and hard to control. And like @dasch said, if the user calls this method multiple times, it may spawns multiple threads doing the same thing. In this context, the consumer should only be aware of the new refereshed set of topics before fetching a new fetch. You can choose another approach:
- At the beginning of method
fetch_batches
, check for topic cache expiration. If expired, fetch new list of topics from the cluster. If the topic stay the same, continue. - If the list of topic changes:
- Subscribe the new topic to fetcher
- Subcribe the new topic to group manager
- Subscribe the new topic to offset manager
- Mark cluster information as stale, just to be sure
- Force join group
Thanks @dasch and @nguyenquangminh0711 for pointing out the problem. I took the approach based on Minh's suggestion and remove the thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, just a small comment. @nguyenquangminh0711 do you want to review again?
lib/kafka/consumer.rb
Outdated
@subscribed_topics.keys.each do |topic_or_regex| | ||
default_offset = @subscribed_topics[topic_or_regex][:default_offset] | ||
start_from_beginning = @subscribed_topics[topic_or_regex][:start_from_beginning] | ||
max_bytes_per_partition = @subscribed_topics[topic_or_regex][:max_bytes_per_partition] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not make sense to use each
here instead, e.g.
@subscribed_topics.each do |topic_or_regex, topic_config|
default_offset = topic_config.fetch(:default_offset) # also I prefer `fetch` because it crashes explicitly
...
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is good to go 🎉 🎉
Thanks! |
Description