Skip to content

Commit

Permalink
Feature: delete consumergroups (#2040)
Browse files Browse the repository at this point in the history
* Add consumergroup related errors
* Add DeleteGroups to protocol.admin
* Implement delete_groups feature on KafkaAdminClient
  • Loading branch information
swenzel authored Sep 17, 2020
1 parent e485a6e commit 16a0b31
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 5 deletions.
93 changes: 89 additions & 4 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Array
Expand Down Expand Up @@ -337,12 +339,34 @@ def _find_coordinator_id(self, group_id):
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
response = future.value
return self._find_coordinator_id_process_response(response)

def _find_many_coordinator_ids(self, group_ids):
"""Find the broker node_id of the coordinator for each of the given groups.
Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
Any errors are immediately raised.
:param group_ids: A list of consumer group IDs. This is typically the group
name as a string.
:return: A list of tuples (group_id, node_id) where node_id is the id
of the broker that is the coordinator for the corresponding group.
"""
futures = {
group_id: self._find_coordinator_id_send_request(group_id)
for group_id in group_ids
}
self._wait_for_futures(list(futures.values()))
groups_coordinators = [
(group_id, self._find_coordinator_id_process_response(f.value))
for group_id, f in futures.items()
]
return groups_coordinators

def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
Expand Down Expand Up @@ -1261,8 +1285,69 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
response = future.value
return self._list_consumer_group_offsets_process_response(response)

# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.
def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Delete Consumer Group Offsets for given consumer groups.
Note:
This does not verify that the group ids actually exist and
group_coordinator_id is the correct coordinator for all these groups.
The result needs checking for potential errors.
:param group_ids: The consumer group ids of the groups which are to be deleted.
:param group_coordinator_id: The node_id of the broker which is the coordinator for
all the groups. Use only if all groups are coordinated by the same broker.
If set to None, will query the cluster to find the coordinator for every single group.
Explicitly specifying this can be useful to prevent
that extra network round trips if you already know the group
coordinator. Default: None.
:return: A list of tuples (group_id, KafkaError)
"""
if group_coordinator_id is not None:
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
else:
groups_coordinators = defaultdict(list)
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
groups_coordinators[group_coordinator_id].append(group_id)
futures = [
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
for group_coordinator_id, group_ids in groups_coordinators.items()
]

self._wait_for_futures(futures)

results = []
for f in futures:
results.extend(self._convert_delete_groups_response(f.value))
return results

def _convert_delete_groups_response(self, response):
if response.API_VERSION <= 1:
results = []
for group_id, error_code in response.results:
results.append((group_id, Errors.for_code(error_code)))
return results
else:
raise NotImplementedError(
"Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))

def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
"""Send a DeleteGroups request to a broker.
:param group_ids: The consumer group ids of the groups which are to be deleted.
:param group_coordinator_id: The node_id of the broker which is the coordinator for
all the groups.
:return: A message future
"""
version = self._matching_api_version(DeleteGroupsRequest)
if version <= 1:
request = DeleteGroupsRequest[version](group_ids)
else:
raise NotImplementedError(
"Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(group_coordinator_id, request)

def _wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):
Expand Down
12 changes: 12 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,18 @@ class SecurityDisabledError(BrokerResponseError):
description = 'Security features are disabled.'


class NonEmptyGroupError(BrokerResponseError):
errno = 68
message = 'NON_EMPTY_GROUP'
description = 'The group is not empty.'


class GroupIdNotFoundError(BrokerResponseError):
errno = 69
message = 'GROUP_ID_NOT_FOUND'
description = 'The group id does not exist.'


class KafkaUnavailableError(KafkaError):
pass

Expand Down
41 changes: 41 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,3 +882,44 @@ class CreatePartitionsRequest_v1(Request):
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]


class DeleteGroupsResponse_v0(Response):
API_KEY = 42
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
("results", Array(
("group_id", String("utf-8")),
("error_code", Int16)))
)


class DeleteGroupsResponse_v1(Response):
API_KEY = 42
API_VERSION = 1
SCHEMA = DeleteGroupsResponse_v0.SCHEMA


class DeleteGroupsRequest_v0(Request):
API_KEY = 42
API_VERSION = 0
RESPONSE_TYPE = DeleteGroupsResponse_v0
SCHEMA = Schema(
("groups_names", Array(String("utf-8")))
)


class DeleteGroupsRequest_v1(Request):
API_KEY = 42
API_VERSION = 1
RESPONSE_TYPE = DeleteGroupsResponse_v1
SCHEMA = DeleteGroupsRequest_v0.SCHEMA


DeleteGroupsRequest = [
DeleteGroupsRequest_v0, DeleteGroupsRequest_v1
]

DeleteGroupsResponse = [
DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
]
78 changes: 77 additions & 1 deletion test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -142,13 +142,15 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
"""
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
Expand Down Expand Up @@ -236,3 +238,77 @@ def consumer_thread(i, group_id):
stop[c].set()
threads[c].join()
threads[c] = None


@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages):
random_group_id = 'test-group-' + random_string(6)
group1 = random_group_id + "_1"
group2 = random_group_id + "_2"
group3 = random_group_id + "_3"

send_messages(range(0, 100), partition=0)
consumer1 = kafka_consumer_factory(group_id=group1)
next(consumer1)
consumer1.close()

consumer2 = kafka_consumer_factory(group_id=group2)
next(consumer2)
consumer2.close()

consumer3 = kafka_consumer_factory(group_id=group3)
next(consumer3)
consumer3.close()

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 in consumergroups
assert group2 in consumergroups
assert group3 in consumergroups

delete_results = {
group_id: error
for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2])
}
assert delete_results[group1] == NoError
assert delete_results[group2] == NoError
assert group3 not in delete_results

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 not in consumergroups
assert group2 not in consumergroups
assert group3 in consumergroups


@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages):
random_group_id = 'test-group-' + random_string(6)
group1 = random_group_id + "_1"
group2 = random_group_id + "_2"
group3 = random_group_id + "_3"

send_messages(range(0, 100), partition=0)
consumer1 = kafka_consumer_factory(group_id=group1)
next(consumer1)
consumer1.close()

consumer2 = kafka_consumer_factory(group_id=group2)
next(consumer2)

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 in consumergroups
assert group2 in consumergroups
assert group3 not in consumergroups

delete_results = {
group_id: error
for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3])
}

assert delete_results[group1] == NoError
assert delete_results[group2] == NonEmptyGroupError
assert delete_results[group3] == GroupIdNotFoundError

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 not in consumergroups
assert group2 in consumergroups
assert group3 not in consumergroups

0 comments on commit 16a0b31

Please sign in to comment.