Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement list_topics, describe_topics, and describe_cluster #1993

Merged
merged 1 commit into from
Feb 6, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,48 @@ def delete_topics(self, topics, timeout_ms=None):
.format(version))
return response

# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the controller
def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
version = self._matching_api_version(MetadataRequest)
if version <= 3:
if auto_topic_creation:
raise IncompatibleBrokerVersion(
"auto_topic_creation requires MetadataRequest >= v4, which"
" is not supported by Kafka {}"
.format(self.config['api_version']))

# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
request = MetadataRequest[version](topics=topics)
elif version <= 5:
request = MetadataRequest[version](
topics=topics,
allow_auto_topic_creation=auto_topic_creation
)

future = self._send_request_to_node(
self._client.least_loaded_node(),
request
)
self._wait_for_futures([future])
return future.value

def list_topics(self):
metadata = self._get_cluster_metadata(topics=None)
obj = metadata.to_object()
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']

def describe_cluster(self):
metadata = self._get_cluster_metadata()
obj = metadata.to_object()
obj.pop('topics') # We have 'describe_topics' for this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Java client include or exclude the topics as part of the cluster metadata call?

I know it seems redundant but dropping them here forces the caller to issue two API calls to the Kafka broker if they want both... not ideal for monitoring scripts that may run frequently. Although I suppose if someone really cares they can call _get_cluster_metadata() directly.

But really I just think we should follow the Java client here, so if they drop the topics, then we should too...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, based on their javadoc they exclude topics as part of the metadata call, though it also looks like they avoid fetching the topics at all.

It's easy enough to throw in a def describe_cluster_and_topics if you want to deviate from the java client a bit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave as-is if it follows Java, that will be good enough for now and anyone who wants the speed boost can live dangerously with _get_cluster_metadata().

Also, my reading of that Java code is they're passing an empty list for the topics, which maps to fetching all topics... but I could be wrong it's been over a year since I looked at the Metadata protocol definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a bit for myself as well, I'm testing it now. I also just noticed that I had missed the "use controller for describe topics" comment, I'm adding that now

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I missed that too!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTR this was fixed in #1995

return obj

@staticmethod
def _convert_describe_acls_response_to_acls(describe_response):
Expand Down