Skip to content

Commit

Permalink
Update group.partitions_for_topic to fetch topic metadata if it does …
Browse files Browse the repository at this point in the history
…not have it
  • Loading branch information
Baisang committed Apr 7, 2019
1 parent f6a8a38 commit 097a7c0
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,8 @@ def committed(self, partition):
committed = None
return committed

def topics(self):
"""Get all topics the user is authorized to view.
Returns:
set: topics
def _fetch_all_topic_metadata(self):
"""A blocking call that fetches topic metadata for all topics in the cluster.
"""
cluster = self._client.cluster
if self._client._metadata_refresh_in_progress and self._client._topics:
Expand All @@ -567,18 +564,34 @@ def topics(self):
future = cluster.request_update()
self._client.poll(future=future)
cluster.need_all_topic_metadata = stash

def topics(self):
"""Get all topics the user is authorized to view.
Returns:
set: topics
"""
cluster = self._client.cluster
self._fetch_all_topic_metadata()
return cluster.topics()

def partitions_for_topic(self, topic):
"""Get metadata about the partitions for a given topic.
This method will issue a remote call to the server if it does not already
have any metadata about the given topic.
Arguments:
topic (str): Topic to check.
Returns:
set: Partition ids
"""
return self._client.cluster.partitions_for_topic(topic)
cluster = self._client.cluster
partitions = cluster.partitions_for_topic(topic)
if not partitions:
self._fetch_all_topic_metadata()
partitions = cluster.partitions_for_topic(topic)
return partitions

def poll(self, timeout_ms=0, max_records=None):
"""Fetch data from assigned topics / partitions.
Expand Down

0 comments on commit 097a7c0

Please sign in to comment.