Skip to content

Commit 1f73287

Browse files
Baisangjeffwidman
authored andcommitted
Make partitions_for_topic a read-through cache (#1781)
If the cluster metadata object has no info about the topic, then issue a blocking metadata call to fetch it.
1 parent edfafc0 commit 1f73287

File tree

1 file changed

+25
-8
lines changed

1 file changed

+25
-8
lines changed

kafka/consumer/group.py

+25-8
Original file line numberDiff line numberDiff line change
@@ -552,11 +552,9 @@ def committed(self, partition):
552552
committed = None
553553
return committed
554554

555-
def topics(self):
556-
"""Get all topics the user is authorized to view.
557-
558-
Returns:
559-
set: topics
555+
def _fetch_all_topit_metadata(self):
556+
"""A blocking call that fetches topic metadata for all topics in the
557+
cluster that the user is authorized to view.
560558
"""
561559
cluster = self._client.cluster
562560
if self._client._metadata_refresh_in_progress and self._client._topics:
@@ -567,18 +565,37 @@ def topics(self):
567565
future = cluster.request_update()
568566
self._client.poll(future=future)
569567
cluster.need_all_topic_metadata = stash
570-
return cluster.topics()
568+
569+
def topics(self):
570+
"""Get all topics the user is authorized to view.
571+
This will always issue a remote call to the cluster to fetch the latest
572+
information.
573+
574+
Returns:
575+
set: topics
576+
"""
577+
self._fetch_all_topic_metadata()
578+
return self._client.cluster.topics()
571579

572580
def partitions_for_topic(self, topic):
573-
"""Get metadata about the partitions for a given topic.
581+
"""This method first checks the local metadata cache for information
582+
about the topic. If the topic is not found (either because the topic
583+
does not exist, the user is not authorized to view the topic, or the
584+
metadata cache is not populated), then it will issue a metadata update
585+
call to the cluster.
574586
575587
Arguments:
576588
topic (str): Topic to check.
577589
578590
Returns:
579591
set: Partition ids
580592
"""
581-
return self._client.cluster.partitions_for_topic(topic)
593+
cluster = self._client.cluster
594+
partitions = cluster.partitions_for_topic(topic)
595+
if partitions is None:
596+
self._fetch_all_topic_metadata()
597+
partitions = cluster.partitions_for_topic(topic)
598+
return partitions
582599

583600
def poll(self, timeout_ms=0, max_records=None):
584601
"""Fetch data from assigned topics / partitions.

0 commit comments

Comments
 (0)