From 86ae2772eee2a0dc42c7c09c86746b1f81000cb0 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 2 May 2024 03:50:05 +0530 Subject: [PATCH] Requested Changes --- examples/adminapi.py | 12 ++--- src/confluent_kafka/__init__.py | 5 +- src/confluent_kafka/_model/__init__.py | 13 ++++++ src/confluent_kafka/admin/__init__.py | 12 ++--- src/confluent_kafka/src/Admin.c | 46 ++++++++++++++++++- .../integration/admin/test_delete_records.py | 22 +++------ 6 files changed, 75 insertions(+), 35 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index aed5b66ae..6eb9d38ba 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -862,19 +862,13 @@ def example_delete_records(a, args): for partition, fut in futmap.items(): try: result = fut.result() - if result.error: - print(f"Error deleting records in topic {partition.topic} partition {partition.partition}" + - f" before offset {partition.offset}: {result.error.str()}") - else: - print( - f"All records deleted before offset {partition.offset} in topic {partition.topic}" + - f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}") + print( + f"All records deleted before offset {partition.offset} in topic {partition.topic}" + + f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}") except KafkaException as e: print( f"Error deleting records in topic {partition.topic} partition {partition.partition}" + f" before offset {partition.offset}: {e}") - except Exception: - raise if __name__ == '__main__': diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index bab73a2c6..7b9883bb9 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -24,7 +24,8 @@ ConsumerGroupState, TopicCollection, TopicPartitionInfo, - IsolationLevel) + IsolationLevel, + DeleteRecordsResult ) from .cimpl import (Producer, Consumer, @@ -49,7 +50,7 @@ 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', - 'IsolationLevel'] + 'IsolationLevel', 'DeleteRecordsResult'] __version__ = version()[0] diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 1c2ec89f0..8b9983242 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -149,3 +149,16 @@ def __lt__(self, other): if self.__class__ != other.__class__: return NotImplemented return self.value < other.value + +class DeleteRecordsResult: + """ + DeleteRecordsResult + Result of a `AdminClient.delete_records` call associated to a partition. + + Parameters + ---------- + offset: int + The offset returned by the delete_records call. + """ + def __init__(self, offset): + self.offset = offset diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 05878bccb..304bff067 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -543,10 +543,8 @@ def _check_delete_records(request): for req in request: if not isinstance(req, _TopicPartition): raise TypeError("Element of the request list must be of type 'TopicPartition' ") - if req is None: - raise ValueError("Individual request in the request list cannot be 'None'") if req.partition < 0: - raise ValueError("Elements of the list must not have negative value for 'partition' field") + raise ValueError(" 'partition' cannot be negative") def create_topics(self, new_topics, **kwargs): """ @@ -1235,10 +1233,8 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): in the cluster. A value of 0 returns immediately. Default: 0 :returns: A dict of futures keyed by the TopicPartition. - The future result() method returns a TopicPartition list indicating that - deletion operation have been performed till the specified Topic Partition - and error if any has occured. User has to check if any error has occured - during deletion in each partition. + The future result() method returns DeleteRecordsResult + or raises KafkaException :rtype: dict[TopicPartition, future] @@ -1249,7 +1245,7 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): AdminClient._check_delete_records(topic_partition_offsets_list) f, futmap = AdminClient._make_futures_v2( - topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list) + topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result) super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs) return futmap diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 0740f2db9..f7e1198fd 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4470,6 +4470,50 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset return NULL; } +static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) { + + size_t c_topic_partition_cnt = c_topic_partitions->cnt; + PyObject *result = NULL; + PyObject *DeleteRecordsResult_type = NULL; + size_t i; + + DeleteRecordsResult_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecordsResult"); + + if(!DeleteRecordsResult_type){ + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecordsResult type"); + return NULL; + } + + result = PyDict_New(); + for(i=0; ielems[i]; + if (c_topic_partitions->elems[i].err) { + value = KafkaError_new_or_None(c_topic_partitions->elems[i].err, rd_kafka_err2str(c_topic_partitions->elems[i].err)); + } else { + PyObject *args = NULL; + PyObject *kwargs = NULL; + kwargs = PyDict_New(); + cfl_PyDict_SetLong(kwargs, "offset", c_topic_partitions->elems[i].offset); + args = PyTuple_New(0); + value = PyObject_Call(DeleteRecordsResult_type, args, kwargs); + Py_DECREF(args); + Py_DECREF(kwargs); + if (value == NULL) + goto raise; + } + PyDict_SetItem(result, c_part_to_py(c_topic_partition), value); + Py_DECREF(value); + } + + Py_DECREF(DeleteRecordsResult_type); + return result; +raise: + Py_DECREF(result); + Py_DECREF(DeleteRecordsResult_type); + return NULL; +} + /** * @brief Event callback triggered from librdkafka's background thread * when Admin API results are ready. @@ -4818,7 +4862,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - result = c_parts_to_py(c_delete_records_res_list); + result = Admin_c_DeleteRecordsResult_to_py(c_delete_records_res_list); break; } diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 8ebc0f9b0..d80081093 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec -from confluent_kafka import TopicPartition +from confluent_kafka.admin import OffsetSpec +from confluent_kafka import TopicPartition, DeleteRecordsResult def test_delete_records(kafka_cluster): @@ -43,29 +43,21 @@ def test_delete_records(kafka_cluster): # Check if the earliest avilable offset for this topic partition is 0 fs = admin_client.list_offsets(requests) - for _, fut in fs.items(): - result = fut.result() - assert isinstance(result, ListOffsetsResultInfo) - assert (result.offset == 0) + result = list(fs.values())[0].result() + assert (result.offset == 0) topic_partition_offset = TopicPartition(topic, 0, 2) # Delete the records fs1 = admin_client.delete_records([topic_partition_offset]) - earliest_offset_available = 0 # Find the earliest available offset for that specific topic partition after deletion has been done fs2 = admin_client.list_offsets(requests) - for _, fut in fs2.items(): - result = fut.result() - assert isinstance(result, ListOffsetsResultInfo) - earliest_offset_available = result.offset # Check if the earliest available offset is equal to the offset passed to the delete records function - for _, fut in fs1.items(): - result = fut.result() - assert isinstance(result, TopicPartition) - assert (result.offset == earliest_offset_available) + res = list(fs1.values())[0].result() + assert isinstance(res, DeleteRecordsResult) + assert (res.offset == list(fs2.values())[0].result().offset) # Delete created topic fs = admin_client.delete_topics([topic])