Skip to content

Commit

Permalink
Added a delete records api (#1710)
Browse files Browse the repository at this point in the history
Added a delete records api

---------

Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
  • Loading branch information
PratRanj07 and pranavrth authored Jul 9, 2024
1 parent 858347c commit 65ab14c
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 35 deletions.
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
# Confluent's Python client for Apache Kafka

## v2.4.1
## v2.5.0

v2.4.1 is a maintenance release with the following fixes and enhancements:
v2.5.0 is a feature release with the following features, fixes and enhancements:

- [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710)
- Added an example to show the usage of the custom logger with `AdminClient`
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed documentation for default value of `operation_timeout` and `request_timeout` in various Admin apis (#1710)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property
- Fixed some memory leaks related to `PyDict_SetItem`.

confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
confluent-kafka-python is based on librdkafka v2.5.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


Expand Down
10 changes: 10 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Supporting classes
- :ref:`MemberAssignment <pythonclient_member_assignment>`
- :ref:`MemberDescription <pythonclient_member_description>`
- :ref:`ConsumerGroupDescription <pythonclient_consumer_group_description>`
- :ref:`DeletedRecords <pythonclient_deleted_records>`

Experimental
These classes are experimental and are likely to be removed, or subject to incompatible
Expand Down Expand Up @@ -387,6 +388,15 @@ ConsumerGroupDescription
.. autoclass:: confluent_kafka.admin.ConsumerGroupDescription
:members:

.. _pythonclient_deleted_records:

**************
DeletedRecords
**************

.. autoclass:: confluent_kafka.admin.DeletedRecords
:members:

.. _pythonclient_member_assignment:

****************
Expand Down
50 changes: 42 additions & 8 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,24 +808,24 @@ def example_list_offsets(a, args):
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 3," +
f" got {len(args) - i}")
topic = args[i]
partition = int(args[i+1])
partition = int(args[i + 1])
topic_partition = TopicPartition(topic, partition)

if "EARLIEST" == args[i+2]:
if "EARLIEST" == args[i + 2]:
offset_spec = OffsetSpec.earliest()

elif "LATEST" == args[i+2]:
elif "LATEST" == args[i + 2]:
offset_spec = OffsetSpec.latest()

elif "MAX_TIMESTAMP" == args[i+2]:
elif "MAX_TIMESTAMP" == args[i + 2]:
offset_spec = OffsetSpec.max_timestamp()

elif "TIMESTAMP" == args[i+2]:
elif "TIMESTAMP" == args[i + 2]:
if i + 4 > len(args):
raise ValueError(
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 4" +
f", got {len(args) - i}")
offset_spec = OffsetSpec.for_timestamp(int(args[i+3]))
offset_spec = OffsetSpec.for_timestamp(int(args[i + 3]))
i += 1
else:
raise ValueError("Invalid OffsetSpec, must be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP")
Expand All @@ -845,6 +845,39 @@ def example_list_offsets(a, args):
.format(partition.topic, partition.partition, e))


def example_delete_records(a, args):
if len(args) == 0:
raise ValueError(
"Invalid number of arguments for delete_records, expected at least 3 " +
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")
if len(args) % 3 != 0:
raise ValueError(
"Invalid number of arguments for delete_records " +
"(Usage: delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..])")

topic_partition_offsets = [
TopicPartition(topic, int(partition), int(offset))
for topic, partition, offset in zip(args[::3], args[1::3], args[2::3])
]

futmap = a.delete_records(topic_partition_offsets)
for partition, fut in futmap.items():
try:
result = fut.result()
if partition.offset == -1:
print(f"All records deleted in topic {partition.topic} partition {partition.partition}." +
f"The minimum offset in this partition is now {result.low_watermark}")
else:
print(
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
f" partition {partition.partition}. The minimum offset in this partition" +
f" is now {result.low_watermark}")
except KafkaException as e:
print(
f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
f" before offset {partition.offset}: {e}")


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand Down Expand Up @@ -883,7 +916,7 @@ def example_list_offsets(a, args):
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
'[<topic2> <partition2> <offset_spec2> ..]\n')

sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
sys.exit(1)

broker = sys.argv[1]
Expand Down Expand Up @@ -913,7 +946,8 @@ def example_list_offsets(a, args):
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
'describe_user_scram_credentials': example_describe_user_scram_credentials,
'alter_user_scram_credentials': example_alter_user_scram_credentials,
'list_offsets': example_list_offsets}
'list_offsets': example_list_offsets,
'delete_records': example_delete_records}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
Loading

0 comments on commit 65ab14c

Please sign in to comment.