Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature delete consumergroups #2040

Merged
merged 12 commits into from
Sep 17, 2020
93 changes: 89 additions & 4 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Array
Expand Down Expand Up @@ -337,12 +339,34 @@ def _find_coordinator_id(self, group_id):
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
response = future.value
return self._find_coordinator_id_process_response(response)

def _find_many_coordinator_ids(self, group_ids):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Java have this method? When I wrote a bunch of this code two years ago I don't recall seeing anything that handled multiple group_ids at once, but I haven't been spelunking in their code for a while...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, this one is mostly copy pasted from the method above. I went through https://kafka.apache.org/protocol but it doesn't look like there is an API that can provide the coordinator id for more than one group. Not even the DescribeGroups API returns the coordinators.

Copy link
Collaborator

@jeffwidman jeffwidman Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this more, I certainly think it'd be convenient to have this method... we could also use it within the describe_consumer_groups() method to emit all the group coordinator requests in parallel: #2124

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""Find the broker node_id of the coordinator for each of the given groups.

Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
Any errors are immediately raised.

:param group_ids: A list of consumer group IDs. This is typically the group
name as a string.
:return: A list of tuples (group_id, node_id) where node_id is the id
of the broker that is the coordinator for the corresponding group.
"""
futures = {
group_id: self._find_coordinator_id_send_request(group_id)
for group_id in group_ids
}
self._wait_for_futures(list(futures.values()))
groups_coordinators = [
(group_id, self._find_coordinator_id_process_response(f.value))
for group_id, f in futures.items()
]
return groups_coordinators

def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.

Expand Down Expand Up @@ -1261,8 +1285,69 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
response = future.value
return self._list_consumer_group_offsets_process_response(response)

# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.
def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Delete Consumer Group Offsets for given consumer groups.

Note:
This does not verify that the group ids actually exist and
group_coordinator_id is the correct coordinator for all these groups.

The result needs checking for potential errors.

:param group_ids: The consumer group ids of the groups which are to be deleted.
:param group_coordinator_id: The node_id of the broker which is the coordinator for
all the groups. Use only if all groups are coordinated by the same broker.
If set to None, will query the cluster to find the coordinator for every single group.
Explicitly specifying this can be useful to prevent
that extra network round trips if you already know the group
coordinator. Default: None.
:return: A list of tuples (group_id, KafkaError)
"""
if group_coordinator_id is not None:
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
else:
groups_coordinators = defaultdict(list)
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
groups_coordinators[group_coordinator_id].append(group_id)
futures = [
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
for group_coordinator_id, group_ids in groups_coordinators.items()
]

self._wait_for_futures(futures)

results = []
for f in futures:
results.extend(self._convert_delete_groups_response(f.value))
return results

def _convert_delete_groups_response(self, response):
Copy link
Collaborator

@jeffwidman jeffwidman Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is outside the scope of this PR, but this client code is a little inconsistent currently between _convert_X(response) and _X_process_response(response)... it'd be nice to make the naming more consistent... but I took a quick look and it's IMO not worth fixing as how they are used varies slightly... somethings are converting things into requests, some are parsing responses, and some are doing pre/post conversion of one type into another...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also thought about this one a while... In the end my reasoning was something like this: the convert methods seem to have little to no logic and really do only conversions. The process methods involve more computation. So I took the former naming.

if response.API_VERSION <= 1:
results = []
for group_id, error_code in response.results:
results.append((group_id, Errors.for_code(error_code)))
return results
else:
raise NotImplementedError(
"Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))

def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
"""Send a DeleteGroups request to a broker.

:param group_ids: The consumer group ids of the groups which are to be deleted.
:param group_coordinator_id: The node_id of the broker which is the coordinator for
all the groups.
:return: A message future
"""
version = self._matching_api_version(DeleteGroupsRequest)
if version <= 1:
request = DeleteGroupsRequest[version](group_ids)
else:
raise NotImplementedError(
"Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(group_coordinator_id, request)

def _wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):
Expand Down
12 changes: 12 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,18 @@ class SecurityDisabledError(BrokerResponseError):
description = 'Security features are disabled.'


class NonEmptyGroupError(BrokerResponseError):
errno = 68
message = 'NON_EMPTY_GROUP'
description = 'The group is not empty.'


class GroupIdNotFoundError(BrokerResponseError):
errno = 69
message = 'GROUP_ID_NOT_FOUND'
description = 'The group id does not exist.'


class KafkaUnavailableError(KafkaError):
pass

Expand Down
41 changes: 41 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,3 +882,44 @@ class CreatePartitionsRequest_v1(Request):
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]


class DeleteGroupsResponse_v0(Response):
API_KEY = 42
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
("results", Array(
("group_id", String("utf-8")),
("error_code", Int16)))
)


class DeleteGroupsResponse_v1(Response):
API_KEY = 42
API_VERSION = 1
SCHEMA = DeleteGroupsResponse_v0.SCHEMA


class DeleteGroupsRequest_v0(Request):
API_KEY = 42
API_VERSION = 0
RESPONSE_TYPE = DeleteGroupsResponse_v0
SCHEMA = Schema(
("groups_names", Array(String("utf-8")))
)


class DeleteGroupsRequest_v1(Request):
API_KEY = 42
API_VERSION = 1
RESPONSE_TYPE = DeleteGroupsResponse_v1
SCHEMA = DeleteGroupsRequest_v0.SCHEMA


DeleteGroupsRequest = [
DeleteGroupsRequest_v0, DeleteGroupsRequest_v1
]

DeleteGroupsResponse = [
DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
]
78 changes: 77 additions & 1 deletion test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -142,13 +142,15 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
"""
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
Expand Down Expand Up @@ -236,3 +238,77 @@ def consumer_thread(i, group_id):
stop[c].set()
threads[c].join()
threads[c] = None


@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages):
random_group_id = 'test-group-' + random_string(6)
group1 = random_group_id + "_1"
group2 = random_group_id + "_2"
group3 = random_group_id + "_3"

send_messages(range(0, 100), partition=0)
consumer1 = kafka_consumer_factory(group_id=group1)
next(consumer1)
consumer1.close()

consumer2 = kafka_consumer_factory(group_id=group2)
next(consumer2)
consumer2.close()

consumer3 = kafka_consumer_factory(group_id=group3)
next(consumer3)
consumer3.close()

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 in consumergroups
assert group2 in consumergroups
assert group3 in consumergroups

delete_results = {
group_id: error
for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2])
}
assert delete_results[group1] == NoError
assert delete_results[group2] == NoError
assert group3 not in delete_results

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 not in consumergroups
assert group2 not in consumergroups
assert group3 in consumergroups


@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages):
random_group_id = 'test-group-' + random_string(6)
group1 = random_group_id + "_1"
group2 = random_group_id + "_2"
group3 = random_group_id + "_3"

send_messages(range(0, 100), partition=0)
consumer1 = kafka_consumer_factory(group_id=group1)
next(consumer1)
consumer1.close()

consumer2 = kafka_consumer_factory(group_id=group2)
next(consumer2)

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 in consumergroups
assert group2 in consumergroups
assert group3 not in consumergroups

delete_results = {
group_id: error
for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3])
}

assert delete_results[group1] == NoError
assert delete_results[group2] == NonEmptyGroupError
assert delete_results[group3] == GroupIdNotFoundError

consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
assert group1 not in consumergroups
assert group2 in consumergroups
assert group3 not in consumergroups