Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a delete records api #1710

Merged
merged 49 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
863a1e8
Added a delete records api
PratRanj07 Feb 14, 2024
93fc6c3
Corrected semaphore errors
PratRanj07 Feb 14, 2024
7336986
Corrected another semaphore errors
PratRanj07 Feb 14, 2024
4417733
Corrected yet another semaphore errors
PratRanj07 Feb 15, 2024
f7952b6
Corrected more semaphore errors
PratRanj07 Feb 15, 2024
6293a56
Minor changes
PratRanj07 Feb 27, 2024
b4a1f08
Whitespace error
PratRanj07 Feb 27, 2024
900540e
Requested changes
PratRanj07 Feb 29, 2024
49ced95
Whitespace error
PratRanj07 Feb 29, 2024
5e67ea3
Minor changes
PratRanj07 Feb 29, 2024
684efd8
Requested changes
PratRanj07 Mar 1, 2024
f0626ff
Whitespace error
PratRanj07 Mar 1, 2024
60b8feb
Error handling change in example
PratRanj07 Mar 4, 2024
654e069
Minor change in error handling example
PratRanj07 Mar 5, 2024
86ae277
Requested Changes
PratRanj07 May 1, 2024
5afe007
semaphore errors corrected
PratRanj07 May 1, 2024
d7f37db
Update examples/adminapi.py
PratRanj07 May 22, 2024
5e4cd7d
Update examples/adminapi.py
PratRanj07 May 22, 2024
50443bc
Update src/confluent_kafka/_model/__init__.py
PratRanj07 May 22, 2024
bd64ea6
Update src/confluent_kafka/admin/__init__.py
PratRanj07 May 22, 2024
fddaa6a
Update src/confluent_kafka/admin/__init__.py
PratRanj07 May 22, 2024
58ac04c
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
9ba4168
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
8f7f2a6
Update src/confluent_kafka/admin/__init__.py
PratRanj07 May 22, 2024
9f3152d
Update src/confluent_kafka/__init__.py
PratRanj07 May 22, 2024
e176626
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
fb202c8
Update src/confluent_kafka/src/Admin.c
PratRanj07 May 22, 2024
766aa16
Merge branch 'master' into deleterecords
PratRanj07 May 22, 2024
e240e2f
requested changes
PratRanj07 May 22, 2024
cd5cfac
semaphore errors
PratRanj07 May 22, 2024
e46e3d6
iformatting errors
PratRanj07 May 22, 2024
1366b2d
small change
PratRanj07 May 22, 2024
8ed70cb
small change
PratRanj07 May 22, 2024
dc1e05c
requested changes
PratRanj07 May 24, 2024
8483b12
indentation errors
PratRanj07 May 24, 2024
293a050
indentation errors
PratRanj07 May 24, 2024
fba0f7b
Requested changes
PratRanj07 May 27, 2024
cdd3ae4
changelog changes
PratRanj07 May 27, 2024
e3b5b96
indentation
PratRanj07 May 27, 2024
9f5dcde
requested changes
PratRanj07 May 27, 2024
227cb0f
requested changes
PratRanj07 May 29, 2024
e359c0f
indentation errors
PratRanj07 May 29, 2024
d93162a
minor change
PratRanj07 May 30, 2024
d965435
Merge branch 'master' into deleterecords
pranavrth Jul 5, 2024
8172ca8
PR Comments
pranavrth Jul 8, 2024
60a4258
Added new test suggested in the PR
pranavrth Jul 8, 2024
65b3417
Fixed a memory leak in ListOffsets Result
pranavrth Jul 8, 2024
3f1a7f3
PR comments
pranavrth Jul 8, 2024
e1f484d
PR comments related to documentation
pranavrth Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions CHANGELOG.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add DeletedRecords to index.rst after ConsumerGroupDescription

- :ref:`DeletedRecords <pythonclient_deleted_records>`

and

.. _pythonclient_deleted_records:

**************
DeletedRecords
**************

.. autoclass:: confluent_kafka.admin.DeletedRecords
   :members:

Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
# Confluent's Python client for Apache Kafka

## v2.4.1
## v2.5.0

v2.4.1 is a maintenance release with the following fixes and enhancements:
v2.5.0 is a feature release with the following features, fixes and enhancements:

- [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710)
- Added an example to show the usage of the custom logger with `AdminClient`
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed documentation for default value of `operation_timeout` and `request_timeout` in various Admin apis (#1710)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property
- Fixed some memory leaks related to `PyDict_SetItem`.

confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
confluent-kafka-python is based on librdkafka v2.5.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


Expand Down
10 changes: 10 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Supporting classes
- :ref:`MemberAssignment <pythonclient_member_assignment>`
- :ref:`MemberDescription <pythonclient_member_description>`
- :ref:`ConsumerGroupDescription <pythonclient_consumer_group_description>`
- :ref:`DeletedRecords <pythonclient_deleted_records>`

Experimental
These classes are experimental and are likely to be removed, or subject to incompatible
Expand Down Expand Up @@ -387,6 +388,15 @@ ConsumerGroupDescription
.. autoclass:: confluent_kafka.admin.ConsumerGroupDescription
:members:

.. _pythonclient_deleted_records:

**************
DeletedRecords
**************

.. autoclass:: confluent_kafka.admin.DeletedRecords
:members:

.. _pythonclient_member_assignment:

****************
Expand Down
50 changes: 42 additions & 8 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,24 +808,24 @@ def example_list_offsets(a, args):
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 3," +
f" got {len(args) - i}")
topic = args[i]
partition = int(args[i+1])
partition = int(args[i + 1])
topic_partition = TopicPartition(topic, partition)

if "EARLIEST" == args[i+2]:
if "EARLIEST" == args[i + 2]:
offset_spec = OffsetSpec.earliest()

elif "LATEST" == args[i+2]:
elif "LATEST" == args[i + 2]:
offset_spec = OffsetSpec.latest()

elif "MAX_TIMESTAMP" == args[i+2]:
elif "MAX_TIMESTAMP" == args[i + 2]:
offset_spec = OffsetSpec.max_timestamp()

elif "TIMESTAMP" == args[i+2]:
elif "TIMESTAMP" == args[i + 2]:
if i + 4 > len(args):
raise ValueError(
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 4" +
f", got {len(args) - i}")
offset_spec = OffsetSpec.for_timestamp(int(args[i+3]))
offset_spec = OffsetSpec.for_timestamp(int(args[i + 3]))
i += 1
else:
raise ValueError("Invalid OffsetSpec, must be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP")
Expand All @@ -845,6 +845,39 @@ def example_list_offsets(a, args):
.format(partition.topic, partition.partition, e))


def example_delete_records(a, args):
if len(args) == 0:
raise ValueError(
"Invalid number of arguments for delete_records, expected at least 3 " +
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")
if len(args) % 3 != 0:
raise ValueError(
"Invalid number of arguments for delete_records " +
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")

topic_partition_offsets = [
TopicPartition(topic, int(partition), int(offset))
for topic, partition, offset in zip(args[::3], args[1::3], args[2::3])
]

futmap = a.delete_records(topic_partition_offsets)
for partition, fut in futmap.items():
try:
result = fut.result()
if partition.offset == -1:
print(f"All records deleted in topic {partition.topic} partition {partition.partition}." +
f"The minimum offset in this partition is now {result.low_watermark}")
else:
print(
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
f" partition {partition.partition}. The minimum offset in this partition" +
f" is now {result.low_watermark}")
except KafkaException as e:
print(
f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
f" before offset {partition.offset}: {e}")


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand Down Expand Up @@ -883,7 +916,7 @@ def example_list_offsets(a, args):
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
'[<topic2> <partition2> <offset_spec2> ..]\n')

sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
sys.exit(1)

broker = sys.argv[1]
Expand Down Expand Up @@ -913,7 +946,8 @@ def example_list_offsets(a, args):
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
'describe_user_scram_credentials': example_describe_user_scram_credentials,
'alter_user_scram_credentials': example_alter_user_scram_credentials,
'list_offsets': example_list_offsets}
'list_offsets': example_list_offsets,
'delete_records': example_delete_records}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
Loading