From 181f956e9565fe99835fa913ea8cc2181ea69600 Mon Sep 17 00:00:00 2001 From: Vincent Maurin Date: Thu, 25 Jan 2024 16:59:39 +0100 Subject: [PATCH 1/6] Implement KIP-202 : DeleteRecords API When doing stream processing, it is convinient to use "transient" topic : * retention time is infinite * records get deleted when consumed The java kafka streams client is using the deleteRecords of the admin client to perform this operation. It is lacking in aiokafka The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API refs #967 --- aiokafka/admin/__init__.py | 3 +- aiokafka/admin/client.py | 72 ++++++++++++++++- aiokafka/admin/records_to_delete.py | 12 +++ aiokafka/protocol/admin.py | 121 ++++++++++++++++++++++++++++ tests/test_admin.py | 37 ++++++++- 5 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 aiokafka/admin/records_to_delete.py diff --git a/aiokafka/admin/__init__.py b/aiokafka/admin/__init__.py index 61913cc8..75d807b8 100644 --- a/aiokafka/admin/__init__.py +++ b/aiokafka/admin/__init__.py @@ -1,5 +1,6 @@ from .client import AIOKafkaAdminClient from .new_partitions import NewPartitions from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete -__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"] +__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"] diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index d8b08752..2838b238 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -6,12 +6,18 @@ from aiokafka import __version__ from aiokafka.client import AIOKafkaClient -from aiokafka.errors import IncompatibleBrokerVersion, for_code +from aiokafka.errors import ( + IncompatibleBrokerVersion, + LeaderNotAvailableError, + NotLeaderForPartitionError, + for_code, +) from aiokafka.protocol.admin import ( AlterConfigsRequest, ApiVersionRequest_v0, CreatePartitionsRequest, CreateTopicsRequest, + DeleteRecordsRequest, DeleteTopicsRequest, DescribeConfigsRequest, DescribeGroupsRequest, @@ -24,6 +30,7 @@ from .config_resource import ConfigResource, ConfigResourceType from .new_topic import NewTopic +from .records_to_delete import RecordsToDelete log = logging.getLogger(__name__) @@ -605,3 +612,66 @@ async def list_consumer_group_offsets( offset_plus_meta = OffsetAndMetadata(offset, metadata) response_dict[tp] = offset_plus_meta return response_dict + + async def delete_records( + self, + records_to_delete: Dict[TopicPartition, RecordsToDelete], + timeout_ms: Optional[int] = None, + ) -> Dict[TopicPartition, int]: + """Delete records from partitions. + + :param records_to_delete: A map of RecordsToDelete for each TopicPartition + :param timeout_ms: Milliseconds to wait for the deletion to complete. + :return: Appropriate version of DeleteRecordsResponse class. + """ + version = self._matching_api_version(DeleteRecordsRequest) + + if self._version_info[MetadataRequest[0].API_KEY] < (0, 10): + metadata_request = MetadataRequest[0]([]) + else: + metadata_request = MetadataRequest[1](None) + + metadata = await self._send_request(metadata_request) + + self._client.cluster.update_metadata(metadata) + + requests = defaultdict(lambda: defaultdict(list)) + responses = {} + + for tp, records in records_to_delete.items(): + leader = self._client.cluster.leader_for_partition(tp) + if leader is None: + raise NotLeaderForPartitionError() + elif leader == -1: + raise LeaderNotAvailableError() + requests[leader][tp.topic].append((tp.partition, records)) + + req_cls = DeleteRecordsRequest[version] + + for leader, delete_request in requests.items(): + request = req_cls( + self._convert_records_to_delete(delete_request), + timeout_ms or self._request_timeout_ms, + {}, + ) + response = await self._client.send(leader, request) + for topic, partitions in response.topics: + for partition_index, low_watermark, error_code in partitions: + if error_code: + err = for_code(error_code) + raise err + responses[TopicPartition(topic, partition_index)] = low_watermark + return responses + + @staticmethod + def _convert_records_to_delete( + records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]], + ): + return [ + ( + topic, + [(partition, rec.before_offset, {}) for partition, rec in records], + {}, + ) + for topic, records in records_to_delete.items() + ] diff --git a/aiokafka/admin/records_to_delete.py b/aiokafka/admin/records_to_delete.py new file mode 100644 index 00000000..76edc146 --- /dev/null +++ b/aiokafka/admin/records_to_delete.py @@ -0,0 +1,12 @@ +class RecordsToDelete: + """A class for deleting records on existing topics. + Arguments: + before_offset (int): + delete all the records before the given offset + """ + + def __init__( + self, + before_offset, + ): + self.before_offset = before_offset diff --git a/aiokafka/protocol/admin.py b/aiokafka/protocol/admin.py index 0e5eace6..c130346a 100644 --- a/aiokafka/protocol/admin.py +++ b/aiokafka/protocol/admin.py @@ -1276,3 +1276,124 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] + + +class DeleteRecordsResponse_v0(Response): + API_KEY = 21 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ), + ), + ), + ), + ) + + +class DeleteRecordsResponse_v1(Response): + API_KEY = 21 + API_VERSION = 1 + SCHEMA = DeleteRecordsResponse_v0.SCHEMA + + +class DeleteRecordsResponse_v2(Response): + API_KEY = 21 + API_VERSION = 2 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("low_watermark", Int64), + ("error_code", Int16), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ) + + +class DeleteRecordsRequest_v0(Request): + API_KEY = 21 + API_VERSION = 0 + RESPONSE_TYPE = DeleteRecordsResponse_v0 + SCHEMA = Schema( + ( + "topics", + Array( + ("name", String("utf-8")), + ( + "partitions", + Array( + ("partition_index", Int32), + ("offset", Int64), + ), + ), + ), + ), + ("timeout_ms", Int32), + ) + + +class DeleteRecordsRequest_v1(Request): + API_KEY = 21 + API_VERSION = 1 + RESPONSE_TYPE = DeleteRecordsResponse_v1 + SCHEMA = DeleteRecordsRequest_v0.SCHEMA + + +class DeleteRecordsRequest_v2(Request): + API_KEY = 21 + API_VERSION = 2 + FLEXIBLE_VERSION = True + RESPONSE_TYPE = DeleteRecordsResponse_v2 + SCHEMA = Schema( + ( + "topics", + CompactArray( + ("name", CompactString("utf-8")), + ( + "partitions", + CompactArray( + ("partition_index", Int32), + ("offset", Int64), + ("tags", TaggedFields), + ), + ), + ("tags", TaggedFields), + ), + ), + ("timeout_ms", Int32), + ("tags", TaggedFields), + ) + + +DeleteRecordsRequest = [ + DeleteRecordsRequest_v0, + DeleteRecordsRequest_v1, + DeleteRecordsRequest_v2, +] + +DeleteRecordsResponse = [ + DeleteRecordsResponse_v0, + DeleteRecordsResponse_v1, + DeleteRecordsResponse_v2, +] diff --git a/tests/test_admin.py b/tests/test_admin.py index fcafeff7..a81b9663 100644 --- a/tests/test_admin.py +++ b/tests/test_admin.py @@ -1,6 +1,6 @@ import asyncio -from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic +from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType from aiokafka.consumer import AIOKafkaConsumer from aiokafka.producer import AIOKafkaProducer @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self): assert resp[tp].offset == msg.offset + 1 resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp]) assert resp[tp].offset == msg.offset + 1 + + @kafka_versions(">=1.1.0") + @run_until_complete + async def test_delete_records(self): + admin = await self.create_admin() + + await admin.create_topics([NewTopic(self.topic, 1, 1)]) + + async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer: + first_message = await producer.send_and_wait( + self.topic, partition=0, value=b"some-message" + ) + await producer.send_and_wait( + self.topic, partition=0, value=b"other-message" + ) + + await admin.delete_records( + { + TopicPartition(self.topic, 0): RecordsToDelete( + before_offset=first_message.offset + 1 + ) + } + ) + + consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.hosts, + enable_auto_commit=False, + auto_offset_reset="earliest", + ) + await consumer.start() + self.add_cleanup(consumer.stop) + + msg = await consumer.getone() + assert msg.value == b"other-message" From a4aad3a9fbef323436ccd46f65c052c6ced117e0 Mon Sep 17 00:00:00 2001 From: Vincent Maurin Date: Mon, 29 Jan 2024 14:57:23 +0100 Subject: [PATCH 2/6] Use common method to get metadata --- aiokafka/admin/client.py | 7 +------ aiokafka/cluster.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index 2838b238..782a74c1 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -626,12 +626,7 @@ async def delete_records( """ version = self._matching_api_version(DeleteRecordsRequest) - if self._version_info[MetadataRequest[0].API_KEY] < (0, 10): - metadata_request = MetadataRequest[0]([]) - else: - metadata_request = MetadataRequest[1](None) - - metadata = await self._send_request(metadata_request) + metadata = await self._get_cluster_metadata() self._client.cluster.update_metadata(metadata) diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index e0cc1cf7..02cd5a83 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -254,7 +254,7 @@ def update_metadata(self, metadata): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: _new_partitions[topic] = {} - for p_error, partition, leader, replicas, isr in partitions: + for p_error, partition, leader, replicas, isr, *_ in partitions: _new_partitions[topic][partition] = PartitionMetadata( topic=topic, partition=partition, From 927472e709c5a4e3c3f52f14ffaa74d35076ff61 Mon Sep 17 00:00:00 2001 From: Vincent Maurin Date: Mon, 29 Jan 2024 15:08:01 +0100 Subject: [PATCH 3/6] Explain the unpacking catch all --- aiokafka/cluster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index 02cd5a83..08b59d2d 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -254,6 +254,7 @@ def update_metadata(self, metadata): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: _new_partitions[topic] = {} + # Starting with v5, MetadataResponse contains more than 5 fields for p_error, partition, leader, replicas, isr, *_ in partitions: _new_partitions[topic][partition] = PartitionMetadata( topic=topic, From ccf85244827c2f85d64087f6d1c0d66bbf93e3f0 Mon Sep 17 00:00:00 2001 From: Vincent Maurin Date: Mon, 29 Jan 2024 17:39:51 +0100 Subject: [PATCH 4/6] Remove usage of TaggedFields TaggedFields doesn't seem to work properly at the moment. Maybe they should be replaced by an implementation closer to the java client with their "flexibleVersions" --- aiokafka/admin/client.py | 6 ++---- aiokafka/protocol/admin.py | 6 ++++-- aiokafka/protocol/types.py | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index 782a74c1..4ca231f4 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -646,8 +646,7 @@ async def delete_records( for leader, delete_request in requests.items(): request = req_cls( self._convert_records_to_delete(delete_request), - timeout_ms or self._request_timeout_ms, - {}, + timeout_ms or self._request_timeout_ms ) response = await self._client.send(leader, request) for topic, partitions in response.topics: @@ -665,8 +664,7 @@ def _convert_records_to_delete( return [ ( topic, - [(partition, rec.before_offset, {}) for partition, rec in records], - {}, + [(partition, rec.before_offset) for partition, rec in records] ) for topic, records in records_to_delete.items() ] diff --git a/aiokafka/protocol/admin.py b/aiokafka/protocol/admin.py index c130346a..133c9a2a 100644 --- a/aiokafka/protocol/admin.py +++ b/aiokafka/protocol/admin.py @@ -1389,11 +1389,13 @@ class DeleteRecordsRequest_v2(Request): DeleteRecordsRequest = [ DeleteRecordsRequest_v0, DeleteRecordsRequest_v1, - DeleteRecordsRequest_v2, + # FIXME: We have some problems with `TaggedFields` + # DeleteRecordsRequest_v2, ] DeleteRecordsResponse = [ DeleteRecordsResponse_v0, DeleteRecordsResponse_v1, - DeleteRecordsResponse_v2, + # FIXME: We have some problems with `TaggedFields` + # DeleteRecordsResponse_v2, ] diff --git a/aiokafka/protocol/types.py b/aiokafka/protocol/types.py index f1e106c5..cc20c6bd 100644 --- a/aiokafka/protocol/types.py +++ b/aiokafka/protocol/types.py @@ -313,6 +313,7 @@ def encode(self, value): return UnsignedVarInt32.encode(len(value) + 1) + value +# FIXME: TaggedFields doesn't seem to work properly so they should be avoided class TaggedFields(AbstractType): @classmethod def decode(cls, data): From 3f2e2e2d59bc4eb7634b0f2df233a2190f4374fc Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Mon, 29 Jan 2024 19:59:11 +0200 Subject: [PATCH 5/6] Fix linting errors (format) --- aiokafka/admin/client.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index 4ca231f4..375f470c 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -646,7 +646,7 @@ async def delete_records( for leader, delete_request in requests.items(): request = req_cls( self._convert_records_to_delete(delete_request), - timeout_ms or self._request_timeout_ms + timeout_ms or self._request_timeout_ms, ) response = await self._client.send(leader, request) for topic, partitions in response.topics: @@ -662,9 +662,6 @@ def _convert_records_to_delete( records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]], ): return [ - ( - topic, - [(partition, rec.before_offset) for partition, rec in records] - ) + (topic, [(partition, rec.before_offset) for partition, rec in records]) for topic, records in records_to_delete.items() ] From 1395718ae72005489618a345b6279f5c9757f515 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Mon, 29 Jan 2024 20:53:53 +0200 Subject: [PATCH 6/6] Add change log --- CHANGES/969.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGES/969.feature diff --git a/CHANGES/969.feature b/CHANGES/969.feature new file mode 100644 index 00000000..96c0793c --- /dev/null +++ b/CHANGES/969.feature @@ -0,0 +1 @@ +Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin)