From 1878cc01756dc0fb230a1775ff330e55c7bba404 Mon Sep 17 00:00:00 2001 From: Brian Sang Date: Sat, 6 Apr 2019 22:09:49 -0700 Subject: [PATCH] Update group.partitions_for_topic to fetch topic metadata if it does not have it --- kafka/consumer/group.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b3e182c5d..7f0e061a1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -552,11 +552,9 @@ def committed(self, partition): committed = None return committed - def topics(self): - """Get all topics the user is authorized to view. - - Returns: - set: topics + def _fetch_all_topit_metadata(self): + """A blocking call that fetches topic metadata for all topics in the + cluster that the user is authorized to view. """ cluster = self._client.cluster if self._client._metadata_refresh_in_progress and self._client._topics: @@ -567,10 +565,24 @@ def topics(self): future = cluster.request_update() self._client.poll(future=future) cluster.need_all_topic_metadata = stash - return cluster.topics() + + def topics(self): + """Get all topics the user is authorized to view. + This will always issue a remote call to the cluster to fetch the latest + information. + + Returns: + set: topics + """ + self._fetch_all_topic_metadata() + return self._client.cluster.topics() def partitions_for_topic(self, topic): - """Get metadata about the partitions for a given topic. + """This method first checks the local metadata cache for information + about the topic. If the topic is not found (either because the topic + does not exist, the user is not authorized to view the topic, or the + metadata cache is not populated), then it will issue a metadata update + call to the cluster. Arguments: topic (str): Topic to check. @@ -578,7 +590,12 @@ def partitions_for_topic(self, topic): Returns: set: Partition ids """ - return self._client.cluster.partitions_for_topic(topic) + cluster = self._client.cluster + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + self._fetch_all_topic_metadata() + partitions = cluster.partitions_for_topic(topic) + return partitions def poll(self, timeout_ms=0, max_records=None): """Fetch data from assigned topics / partitions.