Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a delete records api #1710

Merged
merged 49 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
863a1e8
Added a delete records api
PratRanj07 Feb 14, 2024
93fc6c3
Corrected semaphore errors
PratRanj07 Feb 14, 2024
7336986
Corrected another semaphore errors
PratRanj07 Feb 14, 2024
4417733
Corrected yet another semaphore errors
PratRanj07 Feb 15, 2024
f7952b6
Corrected more semaphore errors
PratRanj07 Feb 15, 2024
6293a56
Minor changes
PratRanj07 Feb 27, 2024
b4a1f08
Whitespace error
PratRanj07 Feb 27, 2024
900540e
Requested changes
PratRanj07 Feb 29, 2024
49ced95
Whitespace error
PratRanj07 Feb 29, 2024
5e67ea3
Minor changes
PratRanj07 Feb 29, 2024
684efd8
Requested changes
PratRanj07 Mar 1, 2024
f0626ff
Whitespace error
PratRanj07 Mar 1, 2024
60b8feb
Error handling change in example
PratRanj07 Mar 4, 2024
654e069
Minor change in error handling example
PratRanj07 Mar 5, 2024
86ae277
Requested Changes
PratRanj07 May 1, 2024
5afe007
semaphore errors corrected
PratRanj07 May 1, 2024
d7f37db
Update examples/adminapi.py
PratRanj07 May 22, 2024
5e4cd7d
Update examples/adminapi.py
PratRanj07 May 22, 2024
50443bc
Update src/confluent_kafka/_model/__init__.py
PratRanj07 May 22, 2024
bd64ea6
Update src/confluent_kafka/admin/__init__.py
PratRanj07 May 22, 2024
fddaa6a
Update src/confluent_kafka/admin/__init__.py
PratRanj07 May 22, 2024
58ac04c
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
9ba4168
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
8f7f2a6
Update src/confluent_kafka/admin/__init__.py
PratRanj07 May 22, 2024
9f3152d
Update src/confluent_kafka/__init__.py
PratRanj07 May 22, 2024
e176626
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
fb202c8
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
766aa16
Merge branch 'master' into deleterecords
PratRanj07 May 22, 2024
e240e2f
requested changes
PratRanj07 May 22, 2024
cd5cfac
semaphore errors
PratRanj07 May 22, 2024
e46e3d6
iformatting errors
PratRanj07 May 22, 2024
1366b2d
small change
PratRanj07 May 22, 2024
8ed70cb
small change
PratRanj07 May 22, 2024
dc1e05c
requested changes
PratRanj07 May 24, 2024
8483b12
indentation errors
PratRanj07 May 24, 2024
293a050
indentation errors
PratRanj07 May 24, 2024
fba0f7b
Requested changes
PratRanj07 May 27, 2024
cdd3ae4
changelog changes
PratRanj07 May 27, 2024
e3b5b96
indentation
PratRanj07 May 27, 2024
9f5dcde
requested changes
PratRanj07 May 27, 2024
227cb0f
requested changes
PratRanj07 May 29, 2024
e359c0f
indentation errors
PratRanj07 May 29, 2024
d93162a
minor change
PratRanj07 May 30, 2024
d965435
Merge branch 'master' into deleterecords
pranavrth Jul 5, 2024
8172ca8
PR Comments
pranavrth Jul 8, 2024
60a4258
Added new test suggested in the PR
pranavrth Jul 8, 2024
65b3417
Fixed a memory leak in ListOffsets Result
pranavrth Jul 8, 2024
3f1a7f3
PR comments
pranavrth Jul 8, 2024
e1f484d
PR comments related to documentation
pranavrth Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions CHANGELOG.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add DeletedRecords to index.rst after ConsumerGroupDescription

- :ref:`DeletedRecords <pythonclient_deleted_records>`

and

.. _pythonclient_deleted_records:

**************
DeletedRecords
**************

.. autoclass:: confluent_kafka.admin.DeletedRecords
   :members:

Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# Confluent's Python client for Apache Kafka

## v2.4.1
## v2.5.0

v2.4.1 is a maintenance release with the following fixes and enhancements:
v2.5.0 is a feature release with the following features, fixes and enhancements:

- [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710)
- Added an example to show the usage of the custom logger with `AdminClient`
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed documentation for default value of operation_timeout in create_topics, delete_topics, create_partitions apis (#1710)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property

confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
confluent-kafka-python is based on librdkafka v2.5.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


Expand Down
50 changes: 42 additions & 8 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,24 +808,24 @@ def example_list_offsets(a, args):
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 3," +
f" got {len(args) - i}")
topic = args[i]
partition = int(args[i+1])
partition = int(args[i + 1])
topic_partition = TopicPartition(topic, partition)

if "EARLIEST" == args[i+2]:
if "EARLIEST" == args[i + 2]:
offset_spec = OffsetSpec.earliest()

elif "LATEST" == args[i+2]:
elif "LATEST" == args[i + 2]:
offset_spec = OffsetSpec.latest()

elif "MAX_TIMESTAMP" == args[i+2]:
elif "MAX_TIMESTAMP" == args[i + 2]:
offset_spec = OffsetSpec.max_timestamp()

elif "TIMESTAMP" == args[i+2]:
elif "TIMESTAMP" == args[i + 2]:
if i + 4 > len(args):
raise ValueError(
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 4" +
f", got {len(args) - i}")
offset_spec = OffsetSpec.for_timestamp(int(args[i+3]))
offset_spec = OffsetSpec.for_timestamp(int(args[i + 3]))
i += 1
else:
raise ValueError("Invalid OffsetSpec, must be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP")
Expand All @@ -845,6 +845,39 @@ def example_list_offsets(a, args):
.format(partition.topic, partition.partition, e))


def example_delete_records(a, args):
if len(args) == 0:
raise ValueError(
"Invalid number of arguments for delete_records, expected at least 3 " +
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")
if len(args) % 3 != 0:
raise ValueError(
"Invalid number of arguments for delete_records " +
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")

topic_partition_offsets = [
TopicPartition(topic, int(partition), int(offset))
for topic, partition, offset in zip(args[::3], args[1::3], args[2::3])
]

futmap = a.delete_records(topic_partition_offsets)
for partition, fut in futmap.items():
try:
result = fut.result()
if partition.offset == -1:
print(f"All records deleted in topic {partition.topic} partition {partition.partition}." +
f"The minimum offset in this partition is now {result.low_watermark}")
else:
print(
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
f" partition {partition.partition}. The minimum offset in this partition" +
f" is now {result.low_watermark}")
except KafkaException as e:
print(
f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
f" before offset {partition.offset}: {e}")


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand Down Expand Up @@ -883,7 +916,7 @@ def example_list_offsets(a, args):
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
'[<topic2> <partition2> <offset_spec2> ..]\n')

sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
sys.exit(1)

broker = sys.argv[1]
Expand Down Expand Up @@ -913,7 +946,8 @@ def example_list_offsets(a, args):
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
'describe_user_scram_credentials': example_describe_user_scram_credentials,
'alter_user_scram_credentials': example_alter_user_scram_credentials,
'list_offsets': example_list_offsets}
'list_offsets': example_list_offsets,
'delete_records': example_delete_records}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
59 changes: 56 additions & 3 deletions src/confluent_kafka/admin/__init__.py
emasab marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
from ._listoffsets import (OffsetSpec, # noqa: F401
ListOffsetsResultInfo)

from ._records import DeletedRecords # noqa: F401

from .._model import TopicCollection as _TopicCollection

from ..cimpl import (KafkaException, # noqa: F401
Expand Down Expand Up @@ -535,6 +537,17 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs):
if not isinstance(kwargs['isolation_level'], _IsolationLevel):
raise TypeError("isolation_level argument should be an IsolationLevel")

@staticmethod
def _check_delete_records(request):
if not isinstance(request, list):
raise TypeError(f"Expected Request to be a list, got '{type(request).__name__}' ")
for req in request:
if not isinstance(req, _TopicPartition):
raise TypeError("Element of the request list must be of type 'TopicPartition'" +
f" got '{type(req).__name__}' ")
if req.partition < 0:
raise ValueError("'partition' cannot be negative")

pranavrth marked this conversation as resolved.
Show resolved Hide resolved
def create_topics(self, new_topics, **kwargs):
"""
Create one or more new topics.
Expand All @@ -544,7 +557,8 @@ def create_topics(self, new_topics, **kwargs):
:param float operation_timeout: The operation timeout in seconds,
controlling how long the CreateTopics request will block
on the broker waiting for the topic creation to propagate
in the cluster. A value of 0 returns immediately. Default: 0
in the cluster. A value of 0 returns immediately.
Default: `socket.timeout.ms/1000.0`
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -577,7 +591,8 @@ def delete_topics(self, topics, **kwargs):
:param float operation_timeout: The operation timeout in seconds,
controlling how long the DeleteTopics request will block
on the broker waiting for the topic deletion to propagate
in the cluster. A value of 0 returns immediately. Default: 0
in the cluster. A value of 0 returns immediately.
Default: `socket.timeout.ms/1000.0`
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
Expand Down Expand Up @@ -615,7 +630,8 @@ def create_partitions(self, new_partitions, **kwargs):
:param float operation_timeout: The operation timeout in seconds,
controlling how long the CreatePartitions request will block
on the broker waiting for the partition creation to propagate
in the cluster. A value of 0 returns immediately. Default: 0
in the cluster. A value of 0 returns immediately.
Default: `socket.timeout.ms/1000.0`
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
Expand Down Expand Up @@ -1205,3 +1221,40 @@ def list_offsets(self, topic_partition_offsets, **kwargs):

super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs)
return futmap

def delete_records(self, topic_partition_offsets, **kwargs):
"""
Deletes all the records before the specified offsets (not including),
in the specified Topic and Partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Written this way they seem like classes

Suggested change
in the specified Topic and Partitions.
in the specified topics and partitions.


:param list(TopicPartitions) topic_partition_offsets: A list of
TopicPartition objects having `offset` field set to the offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TopicPartition objects having `offset` field set to the offset
:class:`.TopicPartition` objects having `offset` field set to the offset

before which all the records should be deleted.
`offset` can be set to :py:const:`OFFSET_END` (-1) to delete all records
in the partition.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param float operation_timeout: The operation timeout in seconds,
PratRanj07 marked this conversation as resolved.
Show resolved Hide resolved
emasab marked this conversation as resolved.
Show resolved Hide resolved
controlling how long the delete_records request will block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
controlling how long the delete_records request will block
controlling how long the `delete_records` request will block

on the broker waiting for the record deletion to propagate
in the cluster. A value of 0 returns immediately.
Default: `socket.timeout.ms/1000.0`

:returns: A dict of futures keyed by the TopicPartition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:returns: A dict of futures keyed by the TopicPartition.
:returns: A dict of futures keyed by the :class:`.TopicPartition`.

The future result() method returns DeletedRecords
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The future result() method returns DeletedRecords
The future result() method returns :class:`.DeletedRecords`

or raises KafkaException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
or raises KafkaException
or raises :class:`.KafkaException`


:rtype: dict[TopicPartition, future]

:raises KafkaException: Operation failed locally or on broker.
:raises TypeError: Invalid input type.
:raises ValueError: Invalid input value.
"""
AdminClient._check_delete_records(topic_partition_offsets)

f, futmap = AdminClient._make_futures_v2(
topic_partition_offsets, _TopicPartition, AdminClient._make_futmap_result)

super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs)
return futmap
27 changes: 27 additions & 0 deletions src/confluent_kafka/admin/_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class DeletedRecords:
"""
DeletedRecords
Represents information about deleted records.

Parameters
----------
low_watermark: int
The "low watermark" for the topic partition on which the deletion was executed.
"""
def __init__(self, low_watermark):
self.low_watermark = low_watermark
Loading