From 577a071c13d455df84e6a7ca31cf229130427085 Mon Sep 17 00:00:00 2001 From: Swen Wenzel <5111028+swenzel@users.noreply.github.com> Date: Thu, 17 Sep 2020 18:17:35 +0200 Subject: [PATCH] Feature: delete consumergroups (#2040) * Add consumergroup related errors * Add DeleteGroups to protocol.admin * Implement delete_groups feature on KafkaAdminClient --- kafka/admin/client.py | 93 ++++++++++++++++++++++++++++++++-- kafka/errors.py | 12 +++++ kafka/protocol/admin.py | 41 +++++++++++++++ test/test_admin_integration.py | 78 +++++++++++++++++++++++++++- 4 files changed, 219 insertions(+), 5 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 1b91e1b80..97fe73acb 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -19,7 +19,9 @@ from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, + DeleteGroupsRequest +) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Array @@ -337,12 +339,34 @@ def _find_coordinator_id(self, group_id): name as a string. :return: The node_id of the broker that is the coordinator. """ - # Note: Java may change how this is implemented in KAFKA-6791. future = self._find_coordinator_id_send_request(group_id) self._wait_for_futures([future]) response = future.value return self._find_coordinator_id_process_response(response) + def _find_many_coordinator_ids(self, group_ids): + """Find the broker node_id of the coordinator for each of the given groups. + + Sends a FindCoordinatorRequest message to the cluster for each group_id. + Will block until the FindCoordinatorResponse is received for all groups. + Any errors are immediately raised. + + :param group_ids: A list of consumer group IDs. This is typically the group + name as a string. + :return: A list of tuples (group_id, node_id) where node_id is the id + of the broker that is the coordinator for the corresponding group. + """ + futures = { + group_id: self._find_coordinator_id_send_request(group_id) + for group_id in group_ids + } + self._wait_for_futures(list(futures.values())) + groups_coordinators = [ + (group_id, self._find_coordinator_id_process_response(f.value)) + for group_id, f in futures.items() + ] + return groups_coordinators + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. @@ -1261,8 +1285,69 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, response = future.value return self._list_consumer_group_offsets_process_response(response) - # delete groups protocol not yet implemented - # Note: send the request to the group's coordinator. + def delete_consumer_groups(self, group_ids, group_coordinator_id=None): + """Delete Consumer Group Offsets for given consumer groups. + + Note: + This does not verify that the group ids actually exist and + group_coordinator_id is the correct coordinator for all these groups. + + The result needs checking for potential errors. + + :param group_ids: The consumer group ids of the groups which are to be deleted. + :param group_coordinator_id: The node_id of the broker which is the coordinator for + all the groups. Use only if all groups are coordinated by the same broker. + If set to None, will query the cluster to find the coordinator for every single group. + Explicitly specifying this can be useful to prevent + that extra network round trips if you already know the group + coordinator. Default: None. + :return: A list of tuples (group_id, KafkaError) + """ + if group_coordinator_id is not None: + futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] + else: + groups_coordinators = defaultdict(list) + for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids): + groups_coordinators[group_coordinator_id].append(group_id) + futures = [ + self._delete_consumer_groups_send_request(group_ids, group_coordinator_id) + for group_coordinator_id, group_ids in groups_coordinators.items() + ] + + self._wait_for_futures(futures) + + results = [] + for f in futures: + results.extend(self._convert_delete_groups_response(f.value)) + return results + + def _convert_delete_groups_response(self, response): + if response.API_VERSION <= 1: + results = [] + for group_id, error_code in response.results: + results.append((group_id, Errors.for_code(error_code))) + return results + else: + raise NotImplementedError( + "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + + def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): + """Send a DeleteGroups request to a broker. + + :param group_ids: The consumer group ids of the groups which are to be deleted. + :param group_coordinator_id: The node_id of the broker which is the coordinator for + all the groups. + :return: A message future + """ + version = self._matching_api_version(DeleteGroupsRequest) + if version <= 1: + request = DeleteGroupsRequest[version](group_ids) + else: + raise NotImplementedError( + "Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(group_coordinator_id, request) def _wait_for_futures(self, futures): while not all(future.succeeded() for future in futures): diff --git a/kafka/errors.py b/kafka/errors.py index 2c1df82de..b33cf51e2 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -449,6 +449,18 @@ class SecurityDisabledError(BrokerResponseError): description = 'Security features are disabled.' +class NonEmptyGroupError(BrokerResponseError): + errno = 68 + message = 'NON_EMPTY_GROUP' + description = 'The group is not empty.' + + +class GroupIdNotFoundError(BrokerResponseError): + errno = 69 + message = 'GROUP_ID_NOT_FOUND' + description = 'The group id does not exist.' + + class KafkaUnavailableError(KafkaError): pass diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index af88ea473..f3b691a5f 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -882,3 +882,44 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + +class DeleteGroupsResponse_v0(Response): + API_KEY = 42 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ("results", Array( + ("group_id", String("utf-8")), + ("error_code", Int16))) + ) + + +class DeleteGroupsResponse_v1(Response): + API_KEY = 42 + API_VERSION = 1 + SCHEMA = DeleteGroupsResponse_v0.SCHEMA + + +class DeleteGroupsRequest_v0(Request): + API_KEY = 42 + API_VERSION = 0 + RESPONSE_TYPE = DeleteGroupsResponse_v0 + SCHEMA = Schema( + ("groups_names", Array(String("utf-8"))) + ) + + +class DeleteGroupsRequest_v1(Request): + API_KEY = 42 + API_VERSION = 1 + RESPONSE_TYPE = DeleteGroupsResponse_v1 + SCHEMA = DeleteGroupsRequest_v0.SCHEMA + + +DeleteGroupsRequest = [ + DeleteGroupsRequest_v0, DeleteGroupsRequest_v1 +] + +DeleteGroupsResponse = [ + DeleteGroupsResponse_v0, DeleteGroupsResponse_v1 +] diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index dc04537d5..06c40a223 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -7,7 +7,7 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) -from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) +from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -142,6 +142,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): with pytest.raises(ValueError): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_does_not_exist(kafka_admin_client): """Tests that the describe consumer group call fails if the group coordinator is not available @@ -149,6 +150,7 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client): with pytest.raises(GroupCoordinatorNotAvailableError): group_description = kafka_admin_client.describe_consumer_groups(['test']) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): """Tests that the describe consumer group call returns valid consumer group information @@ -236,3 +238,77 @@ def consumer_thread(i, group_id): stop[c].set() threads[c].join() threads[c] = None + + +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") +def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): + random_group_id = 'test-group-' + random_string(6) + group1 = random_group_id + "_1" + group2 = random_group_id + "_2" + group3 = random_group_id + "_3" + + send_messages(range(0, 100), partition=0) + consumer1 = kafka_consumer_factory(group_id=group1) + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id=group2) + next(consumer2) + consumer2.close() + + consumer3 = kafka_consumer_factory(group_id=group3) + next(consumer3) + consumer3.close() + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in consumergroups + assert group2 in consumergroups + assert group3 in consumergroups + + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2]) + } + assert delete_results[group1] == NoError + assert delete_results[group2] == NoError + assert group3 not in delete_results + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in consumergroups + assert group2 not in consumergroups + assert group3 in consumergroups + + +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") +def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): + random_group_id = 'test-group-' + random_string(6) + group1 = random_group_id + "_1" + group2 = random_group_id + "_2" + group3 = random_group_id + "_3" + + send_messages(range(0, 100), partition=0) + consumer1 = kafka_consumer_factory(group_id=group1) + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id=group2) + next(consumer2) + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in consumergroups + assert group2 in consumergroups + assert group3 not in consumergroups + + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3]) + } + + assert delete_results[group1] == NoError + assert delete_results[group2] == NonEmptyGroupError + assert delete_results[group3] == GroupIdNotFoundError + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in consumergroups + assert group2 in consumergroups + assert group3 not in consumergroups