Skip to content

Add list consumer groups offsets #1643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion kafka/admin/kafka.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
import socket

from kafka.vendor import six

from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
Expand All @@ -12,8 +16,9 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.version import __version__

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -585,5 +590,75 @@ def list_consumer_groups(self):
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):
"""Fetch Consumer Group Offsets.

Note:
This does not verify that the group_id or partitions actually exist
in the cluster.

As soon as any error is encountered, it is immediately raised.

:param group_id: The consumer group id name for which to fetch offsets.
:param group_coordinator_id: The node_id of the group's coordinator
broker. If set to None, will query the cluster to find the group
coordinator. Explicitly specifying this can be useful to prevent
that extra network round trip if you already know the group
coordinator. Default: None.
:param partitions: A list of TopicPartitions for which to fetch
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
known offsets for the consumer group. Default: None.
:return dictionary: A dictionary with TopicPartition keys and
OffsetAndMetada values. Partitions that are not specified and for
which the group_id does not have a recorded offset are omitted. An
offset value of `-1` indicates the group_id has no offset for that
TopicPartition. A `-1` can only happen for partitions that are
explicitly specified.
"""
group_offsets_listing = {}
if group_coordinator_id is None:
group_coordinator_id = self._find_group_coordinator_id(group_id)
version = self._matching_api_version(OffsetFetchRequest)
if version <= 3:
if partitions is None:
if version <= 1:
raise ValueError(
"""OffsetFetchRequest_v{} requires specifying the
partitions for which to fetch offsets. Omitting the
partitions is only supported on brokers >= 0.10.2.
For details, see KIP-88.""".format(version))
topics_partitions = None
else:
# transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
response = self._send_request_to_node(group_coordinator_id, request)
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# optionally we could retry if error_type.retriable
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# transform response into a dictionary with TopicPartition keys and
# OffsetAndMetada values--this is what the Java AdminClient returns
for topic, partitions in response.topics:
for partition, offset, metadata, error_code in partitions:
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
raise error_type(
"Unable to fetch offsets for group_id {}, topic {}, partition {}"
.format(group_id, topic, partition))
group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
else:
raise NotImplementedError(
"Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
.format(version))
return group_offsets_listing

# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.
1 change: 1 addition & 0 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
["topic", "partition", "leader", "replicas", "isr", "error"])

OffsetAndMetadata = namedtuple("OffsetAndMetadata",
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
["offset", "metadata"])

OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
Expand Down