Skip to content

Commit 431fd87

Browse files
committed
Fix list_consumer_groups() to query all brokers
Previously, this only queried the controller. In actuality, the Kafka protocol requires that the client query all brokers in order to get the full list of consumer groups. Note: The Java code (as best I can tell) doesn't allow limiting this to specific brokers. And on the surface, this makes sense... you typically don't care about specific brokers. However, the inverse is true... consumer groups care about knowing their group coordinator so they don't have to repeatedly query to find it. In fact, a Kafka broker will only return the groups that it's a coordinator for. While this is an implementation detail that is not guaranteed by the upstream broker code, and technically should not be relied upon, I think it very unlikely to change. So monitoring scripts that fetch the offsets or describe the consumers groups of all groups in the cluster can simply issue one call per broker to identify all the coordinators, rather than having to issue one call per consumer group. For an ad-hoc script this doesn't matter, but for a monitoring script that runs every couple of minutes, this can be a big deal. I know in the situations where I will use this, this matters more to me than the risk of the interface unexpectedly breaking.
1 parent d67157c commit 431fd87

File tree

1 file changed

+39
-4
lines changed

1 file changed

+39
-4
lines changed

kafka/admin/kafka.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,18 +487,53 @@ def describe_consumer_groups(self, group_ids):
487487
.format(version))
488488
return self._send(request)
489489

490-
def list_consumer_groups(self):
490+
def list_consumer_groups(self, broker_ids=None):
491491
"""List all consumer groups known to the cluster.
492492
493-
:return: Appropriate version of ListGroupsResponse class
493+
This returns a list of Consumer Group tuples. The tuples are
494+
composed of the consumer group name and the consumer group protocol
495+
type.
496+
497+
Only consumer groups that store their offsets in Kafka are returned.
498+
The protocol type will be an empty string for groups created using
499+
Kafka < 0.9 APIs because, although they store their offsets in Kafka,
500+
they don't use Kafka for group coordination. For groups created using
501+
Kafka >= 0.9, the protocol type will typically be "consumer".
502+
503+
As soon as any error is encountered, it is immediately raised.
504+
505+
:param broker_ids: A list of broker node_ids to query for consumer
506+
groups. If set to None, will query all brokers in the cluster.
507+
Explicitly specifying broker(s) can be useful for determining which
508+
consumer groups are coordinated by those broker(s). Default: None
509+
:return list: List of tuples of Consumer Groups.
510+
:exception GroupCoordinatorNotAvailableError: The coordinator is not
511+
available, so cannot process requests.
512+
:exception GroupLoadInProgressError: The coordinator is loading and
513+
hence can't process requests.
494514
"""
515+
# While we return a list, internally use a set to prevent duplicates
516+
# because if a group coordinator fails after being queried, and its
517+
# consumer groups move to new brokers that haven't yet been queried,
518+
# then the same group could be returned by multiple brokers.
519+
consumer_groups = set()
520+
if broker_ids is None:
521+
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
495522
version = self._matching_api_version(ListGroupsRequest)
496-
if version <= 1:
523+
if version <= 2:
497524
request = ListGroupsRequest[version]()
525+
for broker_id in broker_ids:
526+
response = self._send_request_to_node(broker_id, request)
527+
error_type = Errors.for_code(response.error_code)
528+
if error_type is not Errors.NoError:
529+
raise error_type(
530+
"Request '{}' failed with response '{}'."
531+
.format(request, response))
532+
consumer_groups.update(response.groups)
498533
else:
499534
raise NotImplementedError(
500535
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
501536
.format(version))
502-
return self._send(request)
537+
return list(consumer_groups)
503538

504539
# delete groups protocol not implemented

0 commit comments

Comments
 (0)