Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-204 : DeleteRecords API #969

Merged
merged 6 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/969.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin)
3 changes: 2 additions & 1 deletion aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"]
62 changes: 61 additions & 1 deletion aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.errors import (
IncompatibleBrokerVersion,
LeaderNotAvailableError,
NotLeaderForPartitionError,
for_code,
)
from aiokafka.protocol.admin import (
AlterConfigsRequest,
ApiVersionRequest_v0,
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteRecordsRequest,
DeleteTopicsRequest,
DescribeConfigsRequest,
DescribeGroupsRequest,
Expand All @@ -24,6 +30,7 @@

from .config_resource import ConfigResource, ConfigResourceType
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -605,3 +612,56 @@
offset_plus_meta = OffsetAndMetadata(offset, metadata)
response_dict[tp] = offset_plus_meta
return response_dict

async def delete_records(
self,
records_to_delete: Dict[TopicPartition, RecordsToDelete],
timeout_ms: Optional[int] = None,
) -> Dict[TopicPartition, int]:
"""Delete records from partitions.

:param records_to_delete: A map of RecordsToDelete for each TopicPartition
:param timeout_ms: Milliseconds to wait for the deletion to complete.
:return: Appropriate version of DeleteRecordsResponse class.
"""
version = self._matching_api_version(DeleteRecordsRequest)

metadata = await self._get_cluster_metadata()

self._client.cluster.update_metadata(metadata)

requests = defaultdict(lambda: defaultdict(list))
responses = {}

for tp, records in records_to_delete.items():
leader = self._client.cluster.leader_for_partition(tp)
ods marked this conversation as resolved.
Show resolved Hide resolved
if leader is None:
raise NotLeaderForPartitionError()

Check warning on line 639 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L639

Added line #L639 was not covered by tests
elif leader == -1:
raise LeaderNotAvailableError()

Check warning on line 641 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L641

Added line #L641 was not covered by tests
requests[leader][tp.topic].append((tp.partition, records))

req_cls = DeleteRecordsRequest[version]

for leader, delete_request in requests.items():
request = req_cls(
self._convert_records_to_delete(delete_request),
timeout_ms or self._request_timeout_ms,
)
response = await self._client.send(leader, request)
for topic, partitions in response.topics:
for partition_index, low_watermark, error_code in partitions:
if error_code:
err = for_code(error_code)
raise err

Check warning on line 656 in aiokafka/admin/client.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/admin/client.py#L655-L656

Added lines #L655 - L656 were not covered by tests
responses[TopicPartition(topic, partition_index)] = low_watermark
return responses

@staticmethod
def _convert_records_to_delete(
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
):
return [
(topic, [(partition, rec.before_offset) for partition, rec in records])
for topic, records in records_to_delete.items()
]
12 changes: 12 additions & 0 deletions aiokafka/admin/records_to_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RecordsToDelete:
"""A class for deleting records on existing topics.
Arguments:
before_offset (int):
delete all the records before the given offset
"""

def __init__(
self,
before_offset,
):
self.before_offset = before_offset
3 changes: 2 additions & 1 deletion aiokafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def update_metadata(self, metadata):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
for p_error, partition, leader, replicas, isr in partitions:
# Starting with v5, MetadataResponse contains more than 5 fields
for p_error, partition, leader, replicas, isr, *_ in partitions:
_new_partitions[topic][partition] = PartitionMetadata(
topic=topic,
partition=partition,
Expand Down
123 changes: 123 additions & 0 deletions aiokafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,3 +1276,126 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
),
),
),
),
)


class DeleteRecordsResponse_v1(Response):
API_KEY = 21
API_VERSION = 1
SCHEMA = DeleteRecordsResponse_v0.SCHEMA


class DeleteRecordsResponse_v2(Response):
API_KEY = 21
API_VERSION = 2
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("offset", Int64),
),
),
),
),
("timeout_ms", Int32),
)


class DeleteRecordsRequest_v1(Request):
API_KEY = 21
API_VERSION = 1
RESPONSE_TYPE = DeleteRecordsResponse_v1
SCHEMA = DeleteRecordsRequest_v0.SCHEMA


class DeleteRecordsRequest_v2(Request):
API_KEY = 21
API_VERSION = 2
FLEXIBLE_VERSION = True
RESPONSE_TYPE = DeleteRecordsResponse_v2
SCHEMA = Schema(
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("offset", Int64),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("timeout_ms", Int32),
("tags", TaggedFields),
)


DeleteRecordsRequest = [
DeleteRecordsRequest_v0,
DeleteRecordsRequest_v1,
# FIXME: We have some problems with `TaggedFields`
# DeleteRecordsRequest_v2,
]

DeleteRecordsResponse = [
DeleteRecordsResponse_v0,
DeleteRecordsResponse_v1,
# FIXME: We have some problems with `TaggedFields`
# DeleteRecordsResponse_v2,
]
1 change: 1 addition & 0 deletions aiokafka/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def encode(self, value):
return UnsignedVarInt32.encode(len(value) + 1) + value


# FIXME: TaggedFields doesn't seem to work properly so they should be avoided
class TaggedFields(AbstractType):
@classmethod
def decode(cls, data):
Expand Down
37 changes: 36 additions & 1 deletion tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete
from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType
from aiokafka.consumer import AIOKafkaConsumer
from aiokafka.producer import AIOKafkaProducer
Expand Down Expand Up @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self):
assert resp[tp].offset == msg.offset + 1
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
assert resp[tp].offset == msg.offset + 1

@kafka_versions(">=1.1.0")
@run_until_complete
async def test_delete_records(self):
admin = await self.create_admin()

await admin.create_topics([NewTopic(self.topic, 1, 1)])

async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
first_message = await producer.send_and_wait(
self.topic, partition=0, value=b"some-message"
)
await producer.send_and_wait(
self.topic, partition=0, value=b"other-message"
)

await admin.delete_records(
{
TopicPartition(self.topic, 0): RecordsToDelete(
before_offset=first_message.offset + 1
)
}
)

consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.hosts,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
self.add_cleanup(consumer.stop)

msg = await consumer.getone()
assert msg.value == b"other-message"
Loading