From 863a1e8faf80cb9d0922c3c168ddb3762d4c1940 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 14 Feb 2024 17:46:37 +0530 Subject: [PATCH 01/47] Added a delete records api --- examples/adminapi.py | 38 ++++++- src/confluent_kafka/admin/__init__.py | 51 ++++++++++ src/confluent_kafka/src/Admin.c | 106 ++++++++++++++++++++ tests/integration/admin/test_del_records.py | 66 ++++++++++++ tests/test_Admin.py | 24 +++++ 5 files changed, 282 insertions(+), 3 deletions(-) create mode 100644 tests/integration/admin/test_del_records.py diff --git a/examples/adminapi.py b/examples/adminapi.py index 390aba030..244645db5 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -843,7 +843,37 @@ def example_list_offsets(a, args): except KafkaException as e: print("Topicname : {} Partition_Index : {} Error : {}" .format(partition.topic, partition.partition, e)) - + +def example_del_records(a, args): + topic_partition_offset = [] + if len(args) == 0: + raise ValueError( + "Invalid number of arguments for list offsets, expected at least 1, got 0") + i = 0 + partition_i = 1 + while i < len(args): + if i + 3 > len(args): + raise ValueError( + f"Invalid number of arguments for del_records, partition {partition_i}, expected 3," + + f" got {len(args) - i}") + topic = args[0] + partition = int(args[1]) + offset = int(args[2]) + topic_partition_offset.append(TopicPartition(topic,partition,offset )) + i+=3 + partition_i+=1 + + futmap = a.del_records(topic_partition_offset) + for partition, fut in futmap.items(): + try: + result = fut.result() + print("Deleted before offset : {} in topicname : {} partition : {}".format(result.offset,partition.topic,partition.partition)) + except KafkaException as e: + print("Error in deleting Topicname : {} Partition_Index : {} Offset :{} Error : {}" + .format(partition.topic, partition.partition, partition.offset, e)) + except Exception: + raise + if __name__ == '__main__': if len(sys.argv) < 3: @@ -883,7 +913,8 @@ def example_list_offsets(a, args): ' DELETE ..]\n') sys.stderr.write(' list_offsets ' + '[ ..]\n') - + sys.stderr.write(' del_records ' + + '[ ]\n') sys.exit(1) broker = sys.argv[1] @@ -913,7 +944,8 @@ def example_list_offsets(a, args): 'alter_consumer_group_offsets': example_alter_consumer_group_offsets, 'describe_user_scram_credentials': example_describe_user_scram_credentials, 'alter_user_scram_credentials': example_alter_user_scram_credentials, - 'list_offsets': example_list_offsets} + 'list_offsets': example_list_offsets, + 'del_records': example_del_records} if operation not in opsmap: sys.stderr.write('Unknown operation: %s\n' % operation) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 924361f2e..589baa07e 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -534,6 +534,26 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs): if not isinstance(kwargs['isolation_level'], _IsolationLevel): raise TypeError("isolation_level argument should be an IsolationLevel") + @staticmethod + def _check_del_records(request): + if request is None: + raise TypeError("request cannot be None") + if not isinstance(request,list): + raise TypeError("request must be a list") + for req in request: + if not isinstance(req,_TopicPartition): + raise TypeError("Element of the list must be of type 'TopicPartitions' ") + if req is None: + raise ValueError("'topic_partitions' cannot be null") + if req.topic is None: + raise TypeError("Elements of the list must not have topic attribute as None") + if not req.topic: + raise ValueError("Elements of the list must not have topic attrubute empty") + if req.partition<0: + raise ValueError("Elements of the list must not have negative value for 'partition' field") + if req.offset<0: + raise ValueError("Elements of the list must not have negative value for offset field") + def create_topics(self, new_topics, **kwargs): """ Create one or more new topics. @@ -1204,3 +1224,34 @@ def list_offsets(self, topic_partition_offsets, **kwargs): super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs) return futmap + + def del_records(self,topic_partition_offsets_list,**kwargs): + """ + Helps to delete all the records before the specified offset, + in the specified Topic and Partition. + + :param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects + consisting of the Topic Partition and Offsets on which we have to perform the deleteion. + :param float request_timeout: The overall request timeout in seconds, + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` + + :returns: A dict of futures keyed by the TopicPartition. + The future result() method returns a TopicPartition list indicating that + deletion operation have been performed till the specified Topic Partition + and error if any has occured. + + :rtype: dict[str, future] + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeError: Invalid input type. + :raises ValueError: Invalid input value. + """ + AdminClient._check_del_records(topic_partition_offsets_list) + + f,futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, + _TopicPartition, + AdminClient._make_futmap_result_from_list) + + super(AdminClient,self).del_records(topic_partition_offsets_list, f, **kwargs) + return futmap diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 8515c2aa8..f3d292937 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2963,6 +2963,99 @@ const char Admin_list_offsets_doc[] = PyDoc_STR( " This method should not be used directly, use confluent_kafka.AdminClient.list_offsets()\n"); +/** + * @brief Delete records + */ +PyObject* Admin_del_records (Handle *self,PyObject *args,PyObject *kwargs){ + PyObject* topic_partition_offset, *future; + int del_record_cnt = 1; + rd_kafka_DeleteRecords_t **c_obj = NULL; + struct Admin_options options = Admin_options_INITIALIZER; + rd_kafka_AdminOptions_t *c_options = NULL; + rd_kafka_topic_partition_list_t *c_topic_partition_offset = NULL; + CallState cs; + rd_kafka_queue_t *rkqu; + + static char *kws[] = {"topic_partition_offset", + "future", + /* options */ + "request_timeout", + NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|f", kws, + &topic_partition_offset, + &future, + &options.request_timeout)) { + goto err; + } + + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETERECORDS, + &options, future); + if (!c_options) { + goto err; /* Exception raised by options_to_c() */ + } + + /* options_to_c() sets future as the opaque, which is used in the + * background_event_cb to set the results on the future as the + * admin operation is finished, so we need to keep our own refcount. */ + Py_INCREF(future); + + if (!PyList_Check(topic_partition_offset)) { + PyErr_SetString(PyExc_ValueError, + "topic_partitions_offset must be a list"); + goto err; + } + c_topic_partition_offset = py_to_c_parts(topic_partition_offset); + + c_obj = malloc(sizeof(rd_kafka_DeleteRecords_t *) * del_record_cnt); + c_obj[0] = rd_kafka_DeleteRecords_new(c_topic_partition_offset); + + /* Use librdkafka's background thread queue to automatically dispatch + * Admin_background_event_cb() when the admin operation is finished. */ + rkqu = rd_kafka_queue_get_background(self->rk); + + /* + * Call DeleteRecords + * + * We need to set up a CallState and release GIL here since + * the event_cb may be triggered immediately. + */ + CallState_begin(self, &cs); + rd_kafka_DeleteRecords(self->rk,c_obj,del_record_cnt,c_options,rkqu); + CallState_end(self,&cs); + rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ + rd_kafka_DeleteRecords_destroy_array(c_obj,del_record_cnt); + free(c_obj); + Py_XDECREF(topic_partition_offset); + rd_kafka_AdminOptions_destroy(c_options); + rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); + + Py_RETURN_NONE; +err: + + if (c_obj) { + rd_kafka_DeleteRecords_destroy_array(c_obj, del_record_cnt); + free(c_obj); + } + if (c_options) { + rd_kafka_AdminOptions_destroy(c_options); + Py_DECREF(future); + } + if(c_topic_partition_offset) { + rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); + } + Py_XDECREF(topic_partition_offset); + return NULL; + +} + +const char Admin_del_records_doc[] = PyDoc_STR( + ".. py:function:: del_records(topic_partitions, future, [request_timeout])\n" + "\n" + " Delete records for the particular topic partition before the specified offset provided in the request.\n" + "\n" + " This method should not be used directly, use confluent_kafka.AdminClient.del_records()\n"); + /** * @brief Call rd_kafka_poll() and keep track of crashing callbacks. * @returns -1 if callback crashed (or poll() failed), else the number @@ -3124,6 +3217,10 @@ static PyMethodDef Admin_methods[] = { { "list_offsets", (PyCFunction)Admin_list_offsets, METH_VARARGS|METH_KEYWORDS, Admin_list_offsets_doc }, + + { "del_records", (PyCFunction)Admin_del_records, METH_VARARGS|METH_KEYWORDS, + Admin_del_records_doc + }, { NULL } }; @@ -4712,6 +4809,15 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, break; } + case RD_KAFKA_EVENT_DELETERECORDS_RESULT: + { + const rd_kafka_DeleteRecords_result_t *c_del_records_res = rd_kafka_event_DeleteRecords_result(rkev); + const rd_kafka_topic_partition_list_t *c_del_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_del_records_res); + + result = c_parts_to_py(c_del_records_res_list); + break; + } + default: Py_DECREF(error); /* Py_None */ error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, diff --git a/tests/integration/admin/test_del_records.py b/tests/integration/admin/test_del_records.py new file mode 100644 index 000000000..728b85ad0 --- /dev/null +++ b/tests/integration/admin/test_del_records.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec +from confluent_kafka import TopicPartition, IsolationLevel + +def test_del_records(kafka_cluster): + """ + Test del_records, delete the records upto the specified offset + in that particular partition of the specified topic. + """ + admin_client = kafka_cluster.admin() + + # Create a topic with a single partition + topic = kafka_cluster.create_topic("test-del-records", + { + "num_partitions": 1, + "replication_factor": 1, + }) + + # Create Producer instance + p = kafka_cluster.producer() + p.produce(topic, "Message-1",) + p.produce(topic, "Message-2") + p.produce(topic, "Message-3") + p.flush() + + topic_partition = TopicPartition(topic, 0) + requests = {topic_partition: OffsetSpec.earliest()} + topic_partition_offset = TopicPartition(topic,0,2) + + kwargs = {"isolation_level": IsolationLevel.READ_UNCOMMITTED} + + #Delete the records + fs1 = admin_client.del_records([topic_partition_offset]) + earliest_offset_available = 0 + + #Find the earliest avilable offset for that specific topic partition + fs2 = admin_client.list_offsets(requests,**kwargs) + for _, fut in fs2.items(): + result = fut.result() + assert isinstance(result,ListOffsetsResultInfo) + earliest_offset_available = result.offset + + #Check if the earliest available offset is equal to the offset passes to the delete records function + for _, fut in fs1.items(): + result = fut.result() + assert isinstance(result,TopicPartition) + assert (result.offset == earliest_offset_available) + + # Delete created topic + fs = admin_client.delete_topics([topic]) + for topic, f in fs.items(): + f.result() \ No newline at end of file diff --git a/tests/test_Admin.py b/tests/test_Admin.py index b59cefb68..482008d49 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1169,3 +1169,27 @@ def test_list_offsets_api(): ]: with pytest.raises(TypeError): a.list_offsets(requests, **kwargs) + +def test_del_records(): + a = AdminClient({"socket.timeout.ms": 10}) + + # Request-type tests + with pytest.raises(TypeError): + a.del_records(None) + + with pytest.raises(TypeError): + a.del_records(1) + + #Request-specific tests + with pytest.raises(TypeError): + a.del_records(["test-1"]) + + with pytest.raises(TypeError): + a.del_records([TopicPartition(None)]) + + with pytest.raises(ValueError): + a.del_records([TopicPartition("")]) + + with pytest.raises(ValueError): + a.del_records([TopicPartition("test-topic1")]) + From 93fc6c310dc61ba7f7311b3966a10757cd8288db Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 14 Feb 2024 23:51:31 +0530 Subject: [PATCH 02/47] Corrected semaphore errors --- examples/adminapi.py | 14 ++++++----- src/confluent_kafka/admin/__init__.py | 26 ++++++++++----------- tests/integration/admin/test_del_records.py | 13 ++++++----- tests/test_Admin.py | 8 +++---- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 244645db5..2c8939bb6 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -843,7 +843,8 @@ def example_list_offsets(a, args): except KafkaException as e: print("Topicname : {} Partition_Index : {} Error : {}" .format(partition.topic, partition.partition, e)) - + + def example_del_records(a, args): topic_partition_offset = [] if len(args) == 0: @@ -859,21 +860,22 @@ def example_del_records(a, args): topic = args[0] partition = int(args[1]) offset = int(args[2]) - topic_partition_offset.append(TopicPartition(topic,partition,offset )) - i+=3 - partition_i+=1 + topic_partition_offset.append(TopicPartition(topic, partition, offset)) + i += 3 + partition_i += 1 futmap = a.del_records(topic_partition_offset) for partition, fut in futmap.items(): try: result = fut.result() - print("Deleted before offset : {} in topicname : {} partition : {}".format(result.offset,partition.topic,partition.partition)) + print("Deleted before offset : {} in topicname : {} partition : {}".format(result.offset, + partition.topic, partition.partition)) except KafkaException as e: print("Error in deleting Topicname : {} Partition_Index : {} Offset :{} Error : {}" .format(partition.topic, partition.partition, partition.offset, e)) except Exception: raise - + if __name__ == '__main__': if len(sys.argv) < 3: diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 589baa07e..eccef1f20 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -538,10 +538,10 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs): def _check_del_records(request): if request is None: raise TypeError("request cannot be None") - if not isinstance(request,list): + if not isinstance(request, list): raise TypeError("request must be a list") for req in request: - if not isinstance(req,_TopicPartition): + if not isinstance(req, _TopicPartition): raise TypeError("Element of the list must be of type 'TopicPartitions' ") if req is None: raise ValueError("'topic_partitions' cannot be null") @@ -549,11 +549,11 @@ def _check_del_records(request): raise TypeError("Elements of the list must not have topic attribute as None") if not req.topic: raise ValueError("Elements of the list must not have topic attrubute empty") - if req.partition<0: + if req.partition < 0: raise ValueError("Elements of the list must not have negative value for 'partition' field") - if req.offset<0: + if req.offset < 0: raise ValueError("Elements of the list must not have negative value for offset field") - + def create_topics(self, new_topics, **kwargs): """ Create one or more new topics. @@ -1224,21 +1224,21 @@ def list_offsets(self, topic_partition_offsets, **kwargs): super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs) return futmap - - def del_records(self,topic_partition_offsets_list,**kwargs): + + def del_records(self, topic_partition_offsets_list, **kwargs): """ Helps to delete all the records before the specified offset, in the specified Topic and Partition. - :param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects + :param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects consisting of the Topic Partition and Offsets on which we have to perform the deleteion. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` :returns: A dict of futures keyed by the TopicPartition. - The future result() method returns a TopicPartition list indicating that - deletion operation have been performed till the specified Topic Partition + The future result() method returns a TopicPartition list indicating that + deletion operation have been performed till the specified Topic Partition and error if any has occured. :rtype: dict[str, future] @@ -1249,9 +1249,9 @@ def del_records(self,topic_partition_offsets_list,**kwargs): """ AdminClient._check_del_records(topic_partition_offsets_list) - f,futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, + f,futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list) - - super(AdminClient,self).del_records(topic_partition_offsets_list, f, **kwargs) + + super(AdminClient, self).del_records(topic_partition_offsets_list, f, **kwargs) return futmap diff --git a/tests/integration/admin/test_del_records.py b/tests/integration/admin/test_del_records.py index 728b85ad0..c59d1c571 100644 --- a/tests/integration/admin/test_del_records.py +++ b/tests/integration/admin/test_del_records.py @@ -16,7 +16,8 @@ from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec from confluent_kafka import TopicPartition, IsolationLevel -def test_del_records(kafka_cluster): + +def test_del_records(kafka_cluster): """ Test del_records, delete the records upto the specified offset in that particular partition of the specified topic. @@ -39,15 +40,15 @@ def test_del_records(kafka_cluster): topic_partition = TopicPartition(topic, 0) requests = {topic_partition: OffsetSpec.earliest()} - topic_partition_offset = TopicPartition(topic,0,2) + topic_partition_offset = TopicPartition(topic, 0, 2) kwargs = {"isolation_level": IsolationLevel.READ_UNCOMMITTED} - - #Delete the records + + # Delete the records fs1 = admin_client.del_records([topic_partition_offset]) earliest_offset_available = 0 - #Find the earliest avilable offset for that specific topic partition + # Find the earliest avilable offset for that specific topic partition fs2 = admin_client.list_offsets(requests,**kwargs) for _, fut in fs2.items(): result = fut.result() @@ -63,4 +64,4 @@ def test_del_records(kafka_cluster): # Delete created topic fs = admin_client.delete_topics([topic]) for topic, f in fs.items(): - f.result() \ No newline at end of file + f.result() diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 482008d49..d48408aba 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1170,17 +1170,18 @@ def test_list_offsets_api(): with pytest.raises(TypeError): a.list_offsets(requests, **kwargs) + def test_del_records(): a = AdminClient({"socket.timeout.ms": 10}) - + # Request-type tests with pytest.raises(TypeError): a.del_records(None) with pytest.raises(TypeError): a.del_records(1) - - #Request-specific tests + + # Request-specific tests with pytest.raises(TypeError): a.del_records(["test-1"]) @@ -1192,4 +1193,3 @@ def test_del_records(): with pytest.raises(ValueError): a.del_records([TopicPartition("test-topic1")]) - From 7336986cbd3ccda3bbc6667b534b53459476182e Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 15 Feb 2024 00:18:55 +0530 Subject: [PATCH 03/47] Corrected another semaphore errors --- examples/adminapi.py | 8 ++++---- src/confluent_kafka/admin/__init__.py | 8 ++++---- tests/integration/admin/test_del_records.py | 10 +++++----- tests/test_Admin.py | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 2c8939bb6..8fc7f0c93 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -863,19 +863,19 @@ def example_del_records(a, args): topic_partition_offset.append(TopicPartition(topic, partition, offset)) i += 3 partition_i += 1 - + futmap = a.del_records(topic_partition_offset) for partition, fut in futmap.items(): try: result = fut.result() - print("Deleted before offset : {} in topicname : {} partition : {}".format(result.offset, - partition.topic, partition.partition)) + print("Deleted before offset : {} in topicname : {} partition : {}".format(result.offset, + partition.topic, partition.partition)) except KafkaException as e: print("Error in deleting Topicname : {} Partition_Index : {} Offset :{} Error : {}" .format(partition.topic, partition.partition, partition.offset, e)) except Exception: raise - + if __name__ == '__main__': if len(sys.argv) < 3: diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index eccef1f20..4bd36ce88 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1249,9 +1249,9 @@ def del_records(self, topic_partition_offsets_list, **kwargs): """ AdminClient._check_del_records(topic_partition_offsets_list) - f,futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, - _TopicPartition, - AdminClient._make_futmap_result_from_list) - + f,futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, + _TopicPartition, + AdminClient._make_futmap_result_from_list) + super(AdminClient, self).del_records(topic_partition_offsets_list, f, **kwargs) return futmap diff --git a/tests/integration/admin/test_del_records.py b/tests/integration/admin/test_del_records.py index c59d1c571..22038a4cb 100644 --- a/tests/integration/admin/test_del_records.py +++ b/tests/integration/admin/test_del_records.py @@ -43,22 +43,22 @@ def test_del_records(kafka_cluster): topic_partition_offset = TopicPartition(topic, 0, 2) kwargs = {"isolation_level": IsolationLevel.READ_UNCOMMITTED} - + # Delete the records fs1 = admin_client.del_records([topic_partition_offset]) earliest_offset_available = 0 # Find the earliest avilable offset for that specific topic partition - fs2 = admin_client.list_offsets(requests,**kwargs) + fs2 = admin_client.list_offsets(requests, **kwargs) for _, fut in fs2.items(): result = fut.result() - assert isinstance(result,ListOffsetsResultInfo) + assert isinstance(result, ListOffsetsResultInfo) earliest_offset_available = result.offset - #Check if the earliest available offset is equal to the offset passes to the delete records function + # Check if the earliest available offset is equal to the offset passes to the delete records function for _, fut in fs1.items(): result = fut.result() - assert isinstance(result,TopicPartition) + assert isinstance(result, TopicPartition) assert (result.offset == earliest_offset_available) # Delete created topic diff --git a/tests/test_Admin.py b/tests/test_Admin.py index d48408aba..910fb7b5c 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1173,14 +1173,14 @@ def test_list_offsets_api(): def test_del_records(): a = AdminClient({"socket.timeout.ms": 10}) - + # Request-type tests with pytest.raises(TypeError): a.del_records(None) with pytest.raises(TypeError): a.del_records(1) - + # Request-specific tests with pytest.raises(TypeError): a.del_records(["test-1"]) From 44177336872e9c2815fd25544b9da0343189d138 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 15 Feb 2024 17:13:57 +0530 Subject: [PATCH 04/47] Corrected yet another semaphore errors --- examples/adminapi.py | 20 ++++++++++---------- src/confluent_kafka/admin/__init__.py | 5 ++--- tests/test_Admin.py | 2 +- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 8fc7f0c93..33234e05a 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -808,24 +808,24 @@ def example_list_offsets(a, args): f"Invalid number of arguments for list offsets, partition {partition_i}, expected 3," + f" got {len(args) - i}") topic = args[i] - partition = int(args[i+1]) + partition = int(args[i + 1]) topic_partition = TopicPartition(topic, partition) - if "EARLIEST" == args[i+2]: + if "EARLIEST" == args[i + 2]: offset_spec = OffsetSpec.earliest() - elif "LATEST" == args[i+2]: + elif "LATEST" == args[i + 2]: offset_spec = OffsetSpec.latest() - elif "MAX_TIMESTAMP" == args[i+2]: + elif "MAX_TIMESTAMP" == args[i + 2]: offset_spec = OffsetSpec.max_timestamp() - elif "TIMESTAMP" == args[i+2]: + elif "TIMESTAMP" == args[i + 2]: if i + 4 > len(args): raise ValueError( f"Invalid number of arguments for list offsets, partition {partition_i}, expected 4" + f", got {len(args) - i}") - offset_spec = OffsetSpec.for_timestamp(int(args[i+3])) + offset_spec = OffsetSpec.for_timestamp(int(args[i + 3])) i += 1 else: raise ValueError("Invalid OffsetSpec, must be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP") @@ -863,19 +863,19 @@ def example_del_records(a, args): topic_partition_offset.append(TopicPartition(topic, partition, offset)) i += 3 partition_i += 1 - + futmap = a.del_records(topic_partition_offset) for partition, fut in futmap.items(): try: result = fut.result() - print("Deleted before offset : {} in topicname : {} partition : {}".format(result.offset, - partition.topic, partition.partition)) + print("Deleted before offset : {} in topicname : {} partition : {}".format( + result.offset, partition.topic, partition.partition)) except KafkaException as e: print("Error in deleting Topicname : {} Partition_Index : {} Offset :{} Error : {}" .format(partition.topic, partition.partition, partition.offset, e)) except Exception: raise - + if __name__ == '__main__': if len(sys.argv) < 3: diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 4bd36ce88..80f292d11 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1249,9 +1249,8 @@ def del_records(self, topic_partition_offsets_list, **kwargs): """ AdminClient._check_del_records(topic_partition_offsets_list) - f,futmap = AdminClient._make_futures_v2(topic_partition_offsets_list, - _TopicPartition, - AdminClient._make_futmap_result_from_list) + f,futmap = AdminClient._make_futures_v2( + topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list) super(AdminClient, self).del_records(topic_partition_offsets_list, f, **kwargs) return futmap diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 910fb7b5c..1f8e5fc23 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1173,7 +1173,7 @@ def test_list_offsets_api(): def test_del_records(): a = AdminClient({"socket.timeout.ms": 10}) - + # Request-type tests with pytest.raises(TypeError): a.del_records(None) From f7952b6038fe8a81c57101aadea1caf3a60326d6 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 15 Feb 2024 17:26:10 +0530 Subject: [PATCH 05/47] Corrected more semaphore errors --- src/confluent_kafka/admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 80f292d11..3832ad4ef 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1249,7 +1249,7 @@ def del_records(self, topic_partition_offsets_list, **kwargs): """ AdminClient._check_del_records(topic_partition_offsets_list) - f,futmap = AdminClient._make_futures_v2( + f, futmap = AdminClient._make_futures_v2( topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list) super(AdminClient, self).del_records(topic_partition_offsets_list, f, **kwargs) From 6293a56d60313a38ed5f41ff6d22e4d9a9ddb0b4 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 27 Feb 2024 11:10:58 +0530 Subject: [PATCH 06/47] Minor changes --- examples/adminapi.py | 9 ++++----- tests/integration/admin/test_del_records.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 33234e05a..d4e11b100 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -857,9 +857,9 @@ def example_del_records(a, args): raise ValueError( f"Invalid number of arguments for del_records, partition {partition_i}, expected 3," + f" got {len(args) - i}") - topic = args[0] - partition = int(args[1]) - offset = int(args[2]) + topic = args[i] + partition = int(args[i + 1]) + offset = int(args[i + 2]) topic_partition_offset.append(TopicPartition(topic, partition, offset)) i += 3 partition_i += 1 @@ -915,8 +915,7 @@ def example_del_records(a, args): ' DELETE ..]\n') sys.stderr.write(' list_offsets ' + '[ ..]\n') - sys.stderr.write(' del_records ' + - '[ ]\n') + sys.stderr.write(' del_records ...\n' ) sys.exit(1) broker = sys.argv[1] diff --git a/tests/integration/admin/test_del_records.py b/tests/integration/admin/test_del_records.py index 22038a4cb..f7eb90dd9 100644 --- a/tests/integration/admin/test_del_records.py +++ b/tests/integration/admin/test_del_records.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2023 Confluent Inc. +# Copyright 2024 Confluent Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From b4a1f08db3fc32365cf534337542b92b4023a541 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 27 Feb 2024 11:31:11 +0530 Subject: [PATCH 07/47] Whitespace error --- examples/adminapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index d4e11b100..5abdfdb10 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -915,7 +915,7 @@ def example_del_records(a, args): ' DELETE ..]\n') sys.stderr.write(' list_offsets ' + '[ ..]\n') - sys.stderr.write(' del_records ...\n' ) + sys.stderr.write(' del_records ...\n') sys.exit(1) broker = sys.argv[1] From 900540e112a5177d2cfa6879cd8ee9cca6d60631 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 29 Feb 2024 10:55:22 +0530 Subject: [PATCH 08/47] Requested changes --- examples/adminapi.py | 37 ++++++++----------- src/confluent_kafka/admin/__init__.py | 24 ++++++------ src/confluent_kafka/src/Admin.c | 18 ++++----- ..._del_records.py => test_delete_records.py} | 6 +-- tests/test_Admin.py | 14 +++---- 5 files changed, 46 insertions(+), 53 deletions(-) rename tests/integration/admin/{test_del_records.py => test_delete_records.py} (93%) diff --git a/examples/adminapi.py b/examples/adminapi.py index 5abdfdb10..3368f6770 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -845,34 +845,29 @@ def example_list_offsets(a, args): .format(partition.topic, partition.partition, e)) -def example_del_records(a, args): +def example_delete_records(a, args): topic_partition_offset = [] if len(args) == 0: raise ValueError( "Invalid number of arguments for list offsets, expected at least 1, got 0") - i = 0 - partition_i = 1 - while i < len(args): - if i + 3 > len(args): - raise ValueError( - f"Invalid number of arguments for del_records, partition {partition_i}, expected 3," + - f" got {len(args) - i}") - topic = args[i] - partition = int(args[i + 1]) - offset = int(args[i + 2]) - topic_partition_offset.append(TopicPartition(topic, partition, offset)) - i += 3 - partition_i += 1 + if len(args) % 3 != 0: + raise ValueError("Invalid number of arguments for delete_records") + + topic_partition_offset = [ + TopicPartition(topic, int(partition), int(offset)) + for topic, partition, offset in zip(args[::3], args[1::3], args[2::3]) + ] - futmap = a.del_records(topic_partition_offset) + futmap = a.delete_records(topic_partition_offset) for partition, fut in futmap.items(): try: result = fut.result() - print("Deleted before offset : {} in topicname : {} partition : {}".format( - result.offset, partition.topic, partition.partition)) + print( + f"All records deleted before offset {partition.offset} in topic {partition.topic}"+ + f"partition {partition.partition}. The minimum offset in this partition is now {result.offset}") except KafkaException as e: - print("Error in deleting Topicname : {} Partition_Index : {} Offset :{} Error : {}" - .format(partition.topic, partition.partition, partition.offset, e)) + print( + f"Error deleting records in topic {partition.topic} partition {partition.partition}: {e}") except Exception: raise @@ -915,7 +910,7 @@ def example_del_records(a, args): ' DELETE ..]\n') sys.stderr.write(' list_offsets ' + '[ ..]\n') - sys.stderr.write(' del_records ...\n') + sys.stderr.write(' delete_records ...\n') sys.exit(1) broker = sys.argv[1] @@ -946,7 +941,7 @@ def example_del_records(a, args): 'describe_user_scram_credentials': example_describe_user_scram_credentials, 'alter_user_scram_credentials': example_alter_user_scram_credentials, 'list_offsets': example_list_offsets, - 'del_records': example_del_records} + 'delete_records': example_delete_records} if operation not in opsmap: sys.stderr.write('Unknown operation: %s\n' % operation) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 3832ad4ef..661281cef 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -535,24 +535,18 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs): raise TypeError("isolation_level argument should be an IsolationLevel") @staticmethod - def _check_del_records(request): + def _check_delete_records(request): if request is None: raise TypeError("request cannot be None") if not isinstance(request, list): raise TypeError("request must be a list") for req in request: if not isinstance(req, _TopicPartition): - raise TypeError("Element of the list must be of type 'TopicPartitions' ") + raise TypeError("Element of the request list must be of type 'TopicPartition' ") if req is None: - raise ValueError("'topic_partitions' cannot be null") - if req.topic is None: - raise TypeError("Elements of the list must not have topic attribute as None") - if not req.topic: - raise ValueError("Elements of the list must not have topic attrubute empty") + raise ValueError("Individual request in the request list cannot be 'None'") if req.partition < 0: raise ValueError("Elements of the list must not have negative value for 'partition' field") - if req.offset < 0: - raise ValueError("Elements of the list must not have negative value for offset field") def create_topics(self, new_topics, **kwargs): """ @@ -1225,9 +1219,9 @@ def list_offsets(self, topic_partition_offsets, **kwargs): super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs) return futmap - def del_records(self, topic_partition_offsets_list, **kwargs): + def delete_records(self, topic_partition_offsets_list, **kwargs): """ - Helps to delete all the records before the specified offset, + Deletes all the records before the specified offset, in the specified Topic and Partition. :param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects @@ -1235,6 +1229,10 @@ def del_records(self, topic_partition_offsets_list, **kwargs): :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` + :param float operation_timeout: The operation timeout in seconds, + controlling how long the DeleteRecords request will block + on the broker waiting for the partition creation to propagate + in the cluster. A value of 0 returns immediately. Default: 0 :returns: A dict of futures keyed by the TopicPartition. The future result() method returns a TopicPartition list indicating that @@ -1247,10 +1245,10 @@ def del_records(self, topic_partition_offsets_list, **kwargs): :raises TypeError: Invalid input type. :raises ValueError: Invalid input value. """ - AdminClient._check_del_records(topic_partition_offsets_list) + AdminClient._check_delete_records(topic_partition_offsets_list) f, futmap = AdminClient._make_futures_v2( topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list) - super(AdminClient, self).del_records(topic_partition_offsets_list, f, **kwargs) + super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs) return futmap diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index f3d292937..d1555070b 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2966,7 +2966,7 @@ const char Admin_list_offsets_doc[] = PyDoc_STR( /** * @brief Delete records */ -PyObject* Admin_del_records (Handle *self,PyObject *args,PyObject *kwargs){ +PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ PyObject* topic_partition_offset, *future; int del_record_cnt = 1; rd_kafka_DeleteRecords_t **c_obj = NULL; @@ -3049,12 +3049,12 @@ PyObject* Admin_del_records (Handle *self,PyObject *args,PyObject *kwargs){ } -const char Admin_del_records_doc[] = PyDoc_STR( - ".. py:function:: del_records(topic_partitions, future, [request_timeout])\n" +const char Admin_delete_records_doc[] = PyDoc_STR( + ".. py:function:: delete_records(topic_partitions, future, [request_timeout])\n" "\n" " Delete records for the particular topic partition before the specified offset provided in the request.\n" "\n" - " This method should not be used directly, use confluent_kafka.AdminClient.del_records()\n"); + " This method should not be used directly, use confluent_kafka.AdminClient.delete_records()\n"); /** * @brief Call rd_kafka_poll() and keep track of crashing callbacks. @@ -3218,8 +3218,8 @@ static PyMethodDef Admin_methods[] = { Admin_list_offsets_doc }, - { "del_records", (PyCFunction)Admin_del_records, METH_VARARGS|METH_KEYWORDS, - Admin_del_records_doc + { "delete_records", (PyCFunction)Admin_delete_records, METH_VARARGS|METH_KEYWORDS, + Admin_delete_records_doc }, { NULL } @@ -4811,10 +4811,10 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, case RD_KAFKA_EVENT_DELETERECORDS_RESULT: { - const rd_kafka_DeleteRecords_result_t *c_del_records_res = rd_kafka_event_DeleteRecords_result(rkev); - const rd_kafka_topic_partition_list_t *c_del_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_del_records_res); + const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); + const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - result = c_parts_to_py(c_del_records_res_list); + result = c_parts_to_py(c_delete_records_res_list); break; } diff --git a/tests/integration/admin/test_del_records.py b/tests/integration/admin/test_delete_records.py similarity index 93% rename from tests/integration/admin/test_del_records.py rename to tests/integration/admin/test_delete_records.py index f7eb90dd9..3630ce347 100644 --- a/tests/integration/admin/test_del_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -17,9 +17,9 @@ from confluent_kafka import TopicPartition, IsolationLevel -def test_del_records(kafka_cluster): +def test_delete_records(kafka_cluster): """ - Test del_records, delete the records upto the specified offset + Test delete_records, delete the records upto the specified offset in that particular partition of the specified topic. """ admin_client = kafka_cluster.admin() @@ -45,7 +45,7 @@ def test_del_records(kafka_cluster): kwargs = {"isolation_level": IsolationLevel.READ_UNCOMMITTED} # Delete the records - fs1 = admin_client.del_records([topic_partition_offset]) + fs1 = admin_client.delete_records([topic_partition_offset]) earliest_offset_available = 0 # Find the earliest avilable offset for that specific topic partition diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 1f8e5fc23..d9da7fbcd 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1171,25 +1171,25 @@ def test_list_offsets_api(): a.list_offsets(requests, **kwargs) -def test_del_records(): +def test_delete_records(): a = AdminClient({"socket.timeout.ms": 10}) # Request-type tests with pytest.raises(TypeError): - a.del_records(None) + a.delete_records(None) with pytest.raises(TypeError): - a.del_records(1) + a.delete_records(1) # Request-specific tests with pytest.raises(TypeError): - a.del_records(["test-1"]) + a.delete_records(["test-1"]) with pytest.raises(TypeError): - a.del_records([TopicPartition(None)]) + a.delete_records([TopicPartition(None)]) with pytest.raises(ValueError): - a.del_records([TopicPartition("")]) + a.delete_records([TopicPartition("")]) with pytest.raises(ValueError): - a.del_records([TopicPartition("test-topic1")]) + a.delete_records([TopicPartition("test-topic1")]) From 49ced9538e250aff029dcef4a9473d3cae1e477e Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 29 Feb 2024 11:01:14 +0530 Subject: [PATCH 09/47] Whitespace error --- examples/adminapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 3368f6770..7bb666ec0 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -863,7 +863,7 @@ def example_delete_records(a, args): try: result = fut.result() print( - f"All records deleted before offset {partition.offset} in topic {partition.topic}"+ + f"All records deleted before offset {partition.offset} in topic {partition.topic}" + f"partition {partition.partition}. The minimum offset in this partition is now {result.offset}") except KafkaException as e: print( From 5e67ea3705b7f8e0745f4801c6f9836c93cdd292 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 29 Feb 2024 12:01:13 +0530 Subject: [PATCH 10/47] Minor changes --- examples/adminapi.py | 2 +- tests/integration/admin/test_delete_records.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 7bb666ec0..d0a0513ae 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -849,7 +849,7 @@ def example_delete_records(a, args): topic_partition_offset = [] if len(args) == 0: raise ValueError( - "Invalid number of arguments for list offsets, expected at least 1, got 0") + "Invalid number of arguments for delete_records, expected at least 1, got 0") if len(args) % 3 != 0: raise ValueError("Invalid number of arguments for delete_records") diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 3630ce347..063af1550 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -48,14 +48,14 @@ def test_delete_records(kafka_cluster): fs1 = admin_client.delete_records([topic_partition_offset]) earliest_offset_available = 0 - # Find the earliest avilable offset for that specific topic partition + # Find the earliest available offset for that specific topic partition fs2 = admin_client.list_offsets(requests, **kwargs) for _, fut in fs2.items(): result = fut.result() assert isinstance(result, ListOffsetsResultInfo) earliest_offset_available = result.offset - # Check if the earliest available offset is equal to the offset passes to the delete records function + # Check if the earliest available offset is equal to the offset passed to the delete records function for _, fut in fs1.items(): result = fut.result() assert isinstance(result, TopicPartition) From 684efd899f24802645e46b02e435bc8e859946b4 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 1 Mar 2024 15:06:53 +0530 Subject: [PATCH 11/47] Requested changes --- examples/adminapi.py | 5 +++-- src/confluent_kafka/src/Admin.c | 22 +++++++++++-------- .../integration/admin/test_delete_records.py | 16 +++++++++----- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index d0a0513ae..52e5197a3 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -849,7 +849,7 @@ def example_delete_records(a, args): topic_partition_offset = [] if len(args) == 0: raise ValueError( - "Invalid number of arguments for delete_records, expected at least 1, got 0") + "Invalid number of arguments for delete_records, expected at least 3, got 0") if len(args) % 3 != 0: raise ValueError("Invalid number of arguments for delete_records") @@ -867,7 +867,8 @@ def example_delete_records(a, args): f"partition {partition.partition}. The minimum offset in this partition is now {result.offset}") except KafkaException as e: print( - f"Error deleting records in topic {partition.topic} partition {partition.partition}: {e}") + f"Error deleting records in topic {partition.topic} partition {partition.partition}" + + f" before offset {partition.offset}: {e}") except Exception: raise diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index d1555070b..a4008eb6b 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2967,7 +2967,7 @@ const char Admin_list_offsets_doc[] = PyDoc_STR( * @brief Delete records */ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ - PyObject* topic_partition_offset, *future; + PyObject *topic_partition_offset = NULL, *future; int del_record_cnt = 1; rd_kafka_DeleteRecords_t **c_obj = NULL; struct Admin_options options = Admin_options_INITIALIZER; @@ -2980,12 +2980,14 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ "future", /* options */ "request_timeout", + "operation_timeout", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|f", kws, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|ff", kws, &topic_partition_offset, &future, - &options.request_timeout)) { + &options.request_timeout, + &options.operation_timeout)) { goto err; } @@ -3023,16 +3025,18 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ CallState_begin(self, &cs); rd_kafka_DeleteRecords(self->rk,c_obj,del_record_cnt,c_options,rkqu); CallState_end(self,&cs); + rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ + + rd_kafka_AdminOptions_destroy(c_options); rd_kafka_DeleteRecords_destroy_array(c_obj,del_record_cnt); free(c_obj); - Py_XDECREF(topic_partition_offset); - rd_kafka_AdminOptions_destroy(c_options); - rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); + rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); + Py_XDECREF(topic_partition_offset); + Py_RETURN_NONE; err: - if (c_obj) { rd_kafka_DeleteRecords_destroy_array(c_obj, del_record_cnt); free(c_obj); @@ -3050,9 +3054,9 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ } const char Admin_delete_records_doc[] = PyDoc_STR( - ".. py:function:: delete_records(topic_partitions, future, [request_timeout])\n" + ".. py:function:: delete_records(topic_partitions, future, [request_timeout, operation_timeout])\n" "\n" - " Delete records for the particular topic partition before the specified offset provided in the request.\n" + " Delete all the records for the particular topic partition before the specified offset provided in the request.\n" "\n" " This method should not be used directly, use confluent_kafka.AdminClient.delete_records()\n"); diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 063af1550..ac4be639c 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -33,23 +33,29 @@ def test_delete_records(kafka_cluster): # Create Producer instance p = kafka_cluster.producer() - p.produce(topic, "Message-1",) + p.produce(topic, "Message-1") p.produce(topic, "Message-2") p.produce(topic, "Message-3") p.flush() topic_partition = TopicPartition(topic, 0) requests = {topic_partition: OffsetSpec.earliest()} - topic_partition_offset = TopicPartition(topic, 0, 2) - kwargs = {"isolation_level": IsolationLevel.READ_UNCOMMITTED} + # Check if the earliest avilable offset for this topic partition is 0 + fs = admin_client.list_offsets(requests) + for _, fut in fs.items(): + result = fut.result() + assert isinstance(result, ListOffsetsResultInfo) + assert (result.offset == 0) + + topic_partition_offset = TopicPartition(topic, 0, 2) # Delete the records fs1 = admin_client.delete_records([topic_partition_offset]) earliest_offset_available = 0 - # Find the earliest available offset for that specific topic partition - fs2 = admin_client.list_offsets(requests, **kwargs) + # Find the earliest available offset for that specific topic partition after deletion has been done + fs2 = admin_client.list_offsets(requests) for _, fut in fs2.items(): result = fut.result() assert isinstance(result, ListOffsetsResultInfo) From f0626ffa183d388fa239e00f7d80a6b1e3c09c66 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 1 Mar 2024 15:13:57 +0530 Subject: [PATCH 12/47] Whitespace error --- tests/integration/admin/test_delete_records.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index ac4be639c..8ebc0f9b0 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -14,7 +14,7 @@ # limitations under the License. from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec -from confluent_kafka import TopicPartition, IsolationLevel +from confluent_kafka import TopicPartition def test_delete_records(kafka_cluster): @@ -47,7 +47,7 @@ def test_delete_records(kafka_cluster): result = fut.result() assert isinstance(result, ListOffsetsResultInfo) assert (result.offset == 0) - + topic_partition_offset = TopicPartition(topic, 0, 2) # Delete the records From 60b8febd66d5f873a0e2204794c19d8514aaaf2f Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 5 Mar 2024 00:39:56 +0530 Subject: [PATCH 13/47] Error handling change in example --- examples/adminapi.py | 10 +++++++--- src/confluent_kafka/admin/__init__.py | 5 +++-- src/confluent_kafka/src/Admin.c | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 52e5197a3..679fab1e9 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -862,9 +862,13 @@ def example_delete_records(a, args): for partition, fut in futmap.items(): try: result = fut.result() - print( - f"All records deleted before offset {partition.offset} in topic {partition.topic}" + - f"partition {partition.partition}. The minimum offset in this partition is now {result.offset}") + if result.err: + print(f"Error deleting records in topic {partition.topic} partition {partition.partition}" + + f" before offset {partition.offset}: {result.err}") + else: + print( + f"All records deleted before offset {partition.offset} in topic {partition.topic}" + + f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}") except KafkaException as e: print( f"Error deleting records in topic {partition.topic} partition {partition.partition}" + diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 661281cef..05878bccb 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1237,9 +1237,10 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): :returns: A dict of futures keyed by the TopicPartition. The future result() method returns a TopicPartition list indicating that deletion operation have been performed till the specified Topic Partition - and error if any has occured. + and error if any has occured. User has to check if any error has occured + during deletion in each partition. - :rtype: dict[str, future] + :rtype: dict[TopicPartition, future] :raises KafkaException: Operation failed locally or on broker. :raises TypeError: Invalid input type. diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index a4008eb6b..0740f2db9 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3023,13 +3023,13 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ * the event_cb may be triggered immediately. */ CallState_begin(self, &cs); - rd_kafka_DeleteRecords(self->rk,c_obj,del_record_cnt,c_options,rkqu); + rd_kafka_DeleteRecords(self->rk,c_obj, del_record_cnt, c_options,rkqu); CallState_end(self,&cs); rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ rd_kafka_AdminOptions_destroy(c_options); - rd_kafka_DeleteRecords_destroy_array(c_obj,del_record_cnt); + rd_kafka_DeleteRecords_destroy_array(c_obj, del_record_cnt); free(c_obj); rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); From 654e069edabfb13fe867314c6c13da690529997f Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 5 Mar 2024 11:09:38 +0530 Subject: [PATCH 14/47] Minor change in error handling example --- examples/adminapi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 679fab1e9..aed5b66ae 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -862,9 +862,9 @@ def example_delete_records(a, args): for partition, fut in futmap.items(): try: result = fut.result() - if result.err: + if result.error: print(f"Error deleting records in topic {partition.topic} partition {partition.partition}" + - f" before offset {partition.offset}: {result.err}") + f" before offset {partition.offset}: {result.error.str()}") else: print( f"All records deleted before offset {partition.offset} in topic {partition.topic}" + From 86ae2772eee2a0dc42c7c09c86746b1f81000cb0 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 2 May 2024 03:50:05 +0530 Subject: [PATCH 15/47] Requested Changes --- examples/adminapi.py | 12 ++--- src/confluent_kafka/__init__.py | 5 +- src/confluent_kafka/_model/__init__.py | 13 ++++++ src/confluent_kafka/admin/__init__.py | 12 ++--- src/confluent_kafka/src/Admin.c | 46 ++++++++++++++++++- .../integration/admin/test_delete_records.py | 22 +++------ 6 files changed, 75 insertions(+), 35 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index aed5b66ae..6eb9d38ba 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -862,19 +862,13 @@ def example_delete_records(a, args): for partition, fut in futmap.items(): try: result = fut.result() - if result.error: - print(f"Error deleting records in topic {partition.topic} partition {partition.partition}" + - f" before offset {partition.offset}: {result.error.str()}") - else: - print( - f"All records deleted before offset {partition.offset} in topic {partition.topic}" + - f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}") + print( + f"All records deleted before offset {partition.offset} in topic {partition.topic}" + + f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}") except KafkaException as e: print( f"Error deleting records in topic {partition.topic} partition {partition.partition}" + f" before offset {partition.offset}: {e}") - except Exception: - raise if __name__ == '__main__': diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index bab73a2c6..7b9883bb9 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -24,7 +24,8 @@ ConsumerGroupState, TopicCollection, TopicPartitionInfo, - IsolationLevel) + IsolationLevel, + DeleteRecordsResult ) from .cimpl import (Producer, Consumer, @@ -49,7 +50,7 @@ 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', - 'IsolationLevel'] + 'IsolationLevel', 'DeleteRecordsResult'] __version__ = version()[0] diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 1c2ec89f0..8b9983242 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -149,3 +149,16 @@ def __lt__(self, other): if self.__class__ != other.__class__: return NotImplemented return self.value < other.value + +class DeleteRecordsResult: + """ + DeleteRecordsResult + Result of a `AdminClient.delete_records` call associated to a partition. + + Parameters + ---------- + offset: int + The offset returned by the delete_records call. + """ + def __init__(self, offset): + self.offset = offset diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 05878bccb..304bff067 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -543,10 +543,8 @@ def _check_delete_records(request): for req in request: if not isinstance(req, _TopicPartition): raise TypeError("Element of the request list must be of type 'TopicPartition' ") - if req is None: - raise ValueError("Individual request in the request list cannot be 'None'") if req.partition < 0: - raise ValueError("Elements of the list must not have negative value for 'partition' field") + raise ValueError(" 'partition' cannot be negative") def create_topics(self, new_topics, **kwargs): """ @@ -1235,10 +1233,8 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): in the cluster. A value of 0 returns immediately. Default: 0 :returns: A dict of futures keyed by the TopicPartition. - The future result() method returns a TopicPartition list indicating that - deletion operation have been performed till the specified Topic Partition - and error if any has occured. User has to check if any error has occured - during deletion in each partition. + The future result() method returns DeleteRecordsResult + or raises KafkaException :rtype: dict[TopicPartition, future] @@ -1249,7 +1245,7 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): AdminClient._check_delete_records(topic_partition_offsets_list) f, futmap = AdminClient._make_futures_v2( - topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list) + topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result) super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs) return futmap diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 0740f2db9..f7e1198fd 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4470,6 +4470,50 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset return NULL; } +static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) { + + size_t c_topic_partition_cnt = c_topic_partitions->cnt; + PyObject *result = NULL; + PyObject *DeleteRecordsResult_type = NULL; + size_t i; + + DeleteRecordsResult_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecordsResult"); + + if(!DeleteRecordsResult_type){ + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecordsResult type"); + return NULL; + } + + result = PyDict_New(); + for(i=0; ielems[i]; + if (c_topic_partitions->elems[i].err) { + value = KafkaError_new_or_None(c_topic_partitions->elems[i].err, rd_kafka_err2str(c_topic_partitions->elems[i].err)); + } else { + PyObject *args = NULL; + PyObject *kwargs = NULL; + kwargs = PyDict_New(); + cfl_PyDict_SetLong(kwargs, "offset", c_topic_partitions->elems[i].offset); + args = PyTuple_New(0); + value = PyObject_Call(DeleteRecordsResult_type, args, kwargs); + Py_DECREF(args); + Py_DECREF(kwargs); + if (value == NULL) + goto raise; + } + PyDict_SetItem(result, c_part_to_py(c_topic_partition), value); + Py_DECREF(value); + } + + Py_DECREF(DeleteRecordsResult_type); + return result; +raise: + Py_DECREF(result); + Py_DECREF(DeleteRecordsResult_type); + return NULL; +} + /** * @brief Event callback triggered from librdkafka's background thread * when Admin API results are ready. @@ -4818,7 +4862,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - result = c_parts_to_py(c_delete_records_res_list); + result = Admin_c_DeleteRecordsResult_to_py(c_delete_records_res_list); break; } diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 8ebc0f9b0..d80081093 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec -from confluent_kafka import TopicPartition +from confluent_kafka.admin import OffsetSpec +from confluent_kafka import TopicPartition, DeleteRecordsResult def test_delete_records(kafka_cluster): @@ -43,29 +43,21 @@ def test_delete_records(kafka_cluster): # Check if the earliest avilable offset for this topic partition is 0 fs = admin_client.list_offsets(requests) - for _, fut in fs.items(): - result = fut.result() - assert isinstance(result, ListOffsetsResultInfo) - assert (result.offset == 0) + result = list(fs.values())[0].result() + assert (result.offset == 0) topic_partition_offset = TopicPartition(topic, 0, 2) # Delete the records fs1 = admin_client.delete_records([topic_partition_offset]) - earliest_offset_available = 0 # Find the earliest available offset for that specific topic partition after deletion has been done fs2 = admin_client.list_offsets(requests) - for _, fut in fs2.items(): - result = fut.result() - assert isinstance(result, ListOffsetsResultInfo) - earliest_offset_available = result.offset # Check if the earliest available offset is equal to the offset passed to the delete records function - for _, fut in fs1.items(): - result = fut.result() - assert isinstance(result, TopicPartition) - assert (result.offset == earliest_offset_available) + res = list(fs1.values())[0].result() + assert isinstance(res, DeleteRecordsResult) + assert (res.offset == list(fs2.values())[0].result().offset) # Delete created topic fs = admin_client.delete_topics([topic]) From 5afe007184b8aacbf4635bcb1ddb895384a93fb1 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 2 May 2024 04:12:40 +0530 Subject: [PATCH 16/47] semaphore errors corrected --- src/confluent_kafka/__init__.py | 2 +- src/confluent_kafka/_model/__init__.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 7b9883bb9..7ba4a59cb 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -25,7 +25,7 @@ TopicCollection, TopicPartitionInfo, IsolationLevel, - DeleteRecordsResult ) + DeleteRecordsResult) from .cimpl import (Producer, Consumer, diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 8b9983242..3b9c7d8a7 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -150,6 +150,7 @@ def __lt__(self, other): return NotImplemented return self.value < other.value + class DeleteRecordsResult: """ DeleteRecordsResult From d7f37db8b3bf919008b98612077851fd480beda8 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:20:53 +0530 Subject: [PATCH 17/47] Update examples/adminapi.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- examples/adminapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 6eb9d38ba..66b23e541 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -849,7 +849,7 @@ def example_delete_records(a, args): topic_partition_offset = [] if len(args) == 0: raise ValueError( - "Invalid number of arguments for delete_records, expected at least 3, got 0") + "Invalid number of arguments for delete_records, expected at least 3 (Usage: delete_records [ ...])") if len(args) % 3 != 0: raise ValueError("Invalid number of arguments for delete_records") From 5e4cd7d05c2ff405418e274f91e78cd2a0307f66 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:21:17 +0530 Subject: [PATCH 18/47] Update examples/adminapi.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- examples/adminapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 66b23e541..f0e171a9f 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -909,7 +909,7 @@ def example_delete_records(a, args): ' DELETE ..]\n') sys.stderr.write(' list_offsets ' + '[ ..]\n') - sys.stderr.write(' delete_records ...\n') + sys.stderr.write(' delete_records [ ...]\n') sys.exit(1) broker = sys.argv[1] From 50443bcee6d979d36f0bbfa830eb37b0933fe2e4 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:21:38 +0530 Subject: [PATCH 19/47] Update src/confluent_kafka/_model/__init__.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/_model/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 3b9c7d8a7..087ee083a 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -159,7 +159,7 @@ class DeleteRecordsResult: Parameters ---------- offset: int - The offset returned by the delete_records call. + The "low watermark" for the topic partition on which the deletion was executed. """ def __init__(self, offset): self.offset = offset From bd64ea626ba11cba6ba81f85d8c0394092ea16a9 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:22:03 +0530 Subject: [PATCH 20/47] Update src/confluent_kafka/admin/__init__.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 304bff067..620583cdd 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -544,7 +544,7 @@ def _check_delete_records(request): if not isinstance(req, _TopicPartition): raise TypeError("Element of the request list must be of type 'TopicPartition' ") if req.partition < 0: - raise ValueError(" 'partition' cannot be negative") + raise ValueError("'partition' cannot be negative") def create_topics(self, new_topics, **kwargs): """ From fddaa6a1d90904d0a2db2f34d94b1daa9311ca08 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:22:26 +0530 Subject: [PATCH 21/47] Update src/confluent_kafka/admin/__init__.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 620583cdd..c9ac08c52 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -539,7 +539,7 @@ def _check_delete_records(request): if request is None: raise TypeError("request cannot be None") if not isinstance(request, list): - raise TypeError("request must be a list") + raise TypeError("Request must be a list") for req in request: if not isinstance(req, _TopicPartition): raise TypeError("Element of the request list must be of type 'TopicPartition' ") From 58ac04c77d1afa63a1340681ab074b5fd24d5a95 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:22:44 +0530 Subject: [PATCH 22/47] Update src/confluent_kafka/src/Admin.c Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/src/Admin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index f7e1198fd..103243a95 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4481,7 +4481,7 @@ static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partiti if(!DeleteRecordsResult_type){ cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecordsResult type"); - return NULL; + goto raise; } result = PyDict_New(); From 9ba4168e199ed6e6ac35a831ca1ed88e8ef75708 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:22:57 +0530 Subject: [PATCH 23/47] Update src/confluent_kafka/src/Admin.c Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/src/Admin.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 103243a95..0aad14562 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4509,8 +4509,8 @@ static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partiti Py_DECREF(DeleteRecordsResult_type); return result; raise: - Py_DECREF(result); - Py_DECREF(DeleteRecordsResult_type); + Py_XDECREF(result); + Py_XDECREF(DeleteRecordsResult_type); return NULL; } From 8f7f2a688ac1aa93453bb79636e59d59898d213a Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:23:39 +0530 Subject: [PATCH 24/47] Update src/confluent_kafka/admin/__init__.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index c9ac08c52..85975e4d7 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1233,7 +1233,7 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): in the cluster. A value of 0 returns immediately. Default: 0 :returns: A dict of futures keyed by the TopicPartition. - The future result() method returns DeleteRecordsResult + The future result() method returns DeletedRecords or raises KafkaException :rtype: dict[TopicPartition, future] From 9f3152de2946617acc99e660b2d65a08a76b9d57 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:24:19 +0530 Subject: [PATCH 25/47] Update src/confluent_kafka/__init__.py Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 7ba4a59cb..4de4db661 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -50,7 +50,7 @@ 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', - 'IsolationLevel', 'DeleteRecordsResult'] + 'IsolationLevel', 'DeletedRecords'] __version__ = version()[0] From e1766263dcf6978a7c44b4521c8bb26e631ea4ad Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:24:56 +0530 Subject: [PATCH 26/47] Update src/confluent_kafka/src/Admin.c Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/src/Admin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 0aad14562..6bb7340ea 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4470,7 +4470,7 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset return NULL; } -static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) { +static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) { size_t c_topic_partition_cnt = c_topic_partitions->cnt; PyObject *result = NULL; From fb202c851ad21e1d6672c29412a0e83db7f540a3 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Wed, 22 May 2024 16:25:30 +0530 Subject: [PATCH 27/47] Update src/confluent_kafka/src/Admin.c Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> --- src/confluent_kafka/src/Admin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 6bb7340ea..2316f7246 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3023,7 +3023,7 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ * the event_cb may be triggered immediately. */ CallState_begin(self, &cs); - rd_kafka_DeleteRecords(self->rk,c_obj, del_record_cnt, c_options,rkqu); + rd_kafka_DeleteRecords(self->rk, c_obj, del_record_cnt, c_options, rkqu); CallState_end(self,&cs); rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ From e240e2fcab2f1569a33c9fedaef83b61d0727e32 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 22 May 2024 17:43:34 +0530 Subject: [PATCH 28/47] requested changes --- CHANGELOG.md | 2 ++ examples/adminapi.py | 5 ++--- src/confluent_kafka/__init__.py | 2 +- src/confluent_kafka/_model/__init__.py | 10 ++++----- src/confluent_kafka/admin/__init__.py | 18 +++++++-------- src/confluent_kafka/src/Admin.c | 22 ++++++++++--------- .../integration/admin/test_delete_records.py | 6 ++--- 7 files changed, 33 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b81d9e16..53a5c6b4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ v2.4.1 is a maintenance release with the following fixes and enhancements: - Removed usage of `strcpy` to enhance security of the client (#1745) - Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745) + - [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710) + - Fixed documentation for default value of operation_timeout in create_topics, delete_topics, create_partitions apis (#1710) confluent-kafka-python is based on librdkafka v2.4.1, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1) diff --git a/examples/adminapi.py b/examples/adminapi.py index f0e171a9f..29b5446f4 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -846,12 +846,11 @@ def example_list_offsets(a, args): def example_delete_records(a, args): - topic_partition_offset = [] if len(args) == 0: raise ValueError( "Invalid number of arguments for delete_records, expected at least 3 (Usage: delete_records [ ...])") if len(args) % 3 != 0: - raise ValueError("Invalid number of arguments for delete_records") + raise ValueError("Invalid number of arguments for delete_records (Usage: delete_records [ ...])") topic_partition_offset = [ TopicPartition(topic, int(partition), int(offset)) @@ -864,7 +863,7 @@ def example_delete_records(a, args): result = fut.result() print( f"All records deleted before offset {partition.offset} in topic {partition.topic}" + - f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}") + f" partition {partition.partition}. The minimum offset in this partition is now {result.low_watermark}") except KafkaException as e: print( f"Error deleting records in topic {partition.topic} partition {partition.partition}" + diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 4de4db661..7a6b04d78 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -25,7 +25,7 @@ TopicCollection, TopicPartitionInfo, IsolationLevel, - DeleteRecordsResult) + DeleteRecords) from .cimpl import (Producer, Consumer, diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 087ee083a..609cad8dc 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -151,15 +151,15 @@ def __lt__(self, other): return self.value < other.value -class DeleteRecordsResult: +class DeleteRecords: """ - DeleteRecordsResult + DeleteRecords Result of a `AdminClient.delete_records` call associated to a partition. Parameters ---------- - offset: int + low_watermark: int The "low watermark" for the topic partition on which the deletion was executed. """ - def __init__(self, offset): - self.offset = offset + def __init__(self, low_watermark): + self.low_watermark = low_watermark diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 85975e4d7..dab7bda1b 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -536,13 +536,11 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs): @staticmethod def _check_delete_records(request): - if request is None: - raise TypeError("request cannot be None") if not isinstance(request, list): - raise TypeError("Request must be a list") + raise TypeError(f"Expected Request to be a list, got '{type(request).__name__}' ") for req in request: if not isinstance(req, _TopicPartition): - raise TypeError("Element of the request list must be of type 'TopicPartition' ") + raise TypeError(f"Element of the request list must be of type 'TopicPartition' got '{type(req).__name__}' ") if req.partition < 0: raise ValueError("'partition' cannot be negative") @@ -555,7 +553,7 @@ def create_topics(self, new_topics, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate - in the cluster. A value of 0 returns immediately. Default: 0 + in the cluster. A value of 0 returns immediately. Default: 60 :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -588,7 +586,7 @@ def delete_topics(self, topics, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate - in the cluster. A value of 0 returns immediately. Default: 0 + in the cluster. A value of 0 returns immediately. Default: 60 :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -626,7 +624,7 @@ def create_partitions(self, new_partitions, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate - in the cluster. A value of 0 returns immediately. Default: 0 + in the cluster. A value of 0 returns immediately. Default: 60 :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -1228,9 +1226,9 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` :param float operation_timeout: The operation timeout in seconds, - controlling how long the DeleteRecords request will block - on the broker waiting for the partition creation to propagate - in the cluster. A value of 0 returns immediately. Default: 0 + controlling how long the delete_records request will block + on the broker waiting for the record deletion to propagate + in the cluster. A value of 0 returns immediately. Default: 60 :returns: A dict of futures keyed by the TopicPartition. The future result() method returns DeletedRecords diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 325e4e7b1..4dd38c595 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4474,13 +4474,13 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li size_t c_topic_partition_cnt = c_topic_partitions->cnt; PyObject *result = NULL; - PyObject *DeleteRecordsResult_type = NULL; + PyObject *DeleteRecords_type = NULL; size_t i; - DeleteRecordsResult_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecordsResult"); + DeleteRecords_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecords"); - if(!DeleteRecordsResult_type){ - cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecordsResult type"); + if(!DeleteRecords_type){ + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecords type"); goto raise; } @@ -4494,23 +4494,25 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li PyObject *args = NULL; PyObject *kwargs = NULL; kwargs = PyDict_New(); - cfl_PyDict_SetLong(kwargs, "offset", c_topic_partitions->elems[i].offset); + cfl_PyDict_SetLong(kwargs, "low_watermark", c_topic_partitions->elems[i].offset); args = PyTuple_New(0); - value = PyObject_Call(DeleteRecordsResult_type, args, kwargs); + value = PyObject_Call(DeleteRecords_type, args, kwargs); Py_DECREF(args); Py_DECREF(kwargs); if (value == NULL) goto raise; } - PyDict_SetItem(result, c_part_to_py(c_topic_partition), value); + PyObject *key = c_part_to_py(c_topic_partition); + PyDict_SetItem(result, key, value); + Py_DECREF(key); Py_DECREF(value); } - Py_DECREF(DeleteRecordsResult_type); + Py_DECREF(DeleteRecords_type); return result; raise: Py_XDECREF(result); - Py_XDECREF(DeleteRecordsResult_type); + Py_XDECREF(DeleteRecords_type); return NULL; } @@ -4862,7 +4864,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - result = Admin_c_DeleteRecordsResult_to_py(c_delete_records_res_list); + result = Admin_c_DeletedRecords_to_py(c_delete_records_res_list); break; } diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index d80081093..3bf0fe82c 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -14,7 +14,7 @@ # limitations under the License. from confluent_kafka.admin import OffsetSpec -from confluent_kafka import TopicPartition, DeleteRecordsResult +from confluent_kafka import TopicPartition, DeleteRecords def test_delete_records(kafka_cluster): @@ -56,8 +56,8 @@ def test_delete_records(kafka_cluster): # Check if the earliest available offset is equal to the offset passed to the delete records function res = list(fs1.values())[0].result() - assert isinstance(res, DeleteRecordsResult) - assert (res.offset == list(fs2.values())[0].result().offset) + assert isinstance(res, DeleteRecords) + assert (res.low_watermark == list(fs2.values())[0].result().offset) # Delete created topic fs = admin_client.delete_topics([topic]) From cd5cfac67469e451418c80db9915666f15934bbf Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 22 May 2024 17:54:21 +0530 Subject: [PATCH 29/47] semaphore errors --- examples/adminapi.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index 29b5446f4..e85e888f2 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -848,9 +848,12 @@ def example_list_offsets(a, args): def example_delete_records(a, args): if len(args) == 0: raise ValueError( - "Invalid number of arguments for delete_records, expected at least 3 (Usage: delete_records [ ...])") + f"Invalid number of arguments for delete_records, expected at least 3 " + + f"(Usage: delete_records [ ...])") if len(args) % 3 != 0: - raise ValueError("Invalid number of arguments for delete_records (Usage: delete_records [ ...])") + raise ValueError( + f"Invalid number of arguments for delete_records " + + f"(Usage: delete_records [ ...])") topic_partition_offset = [ TopicPartition(topic, int(partition), int(offset)) @@ -863,7 +866,8 @@ def example_delete_records(a, args): result = fut.result() print( f"All records deleted before offset {partition.offset} in topic {partition.topic}" + - f" partition {partition.partition}. The minimum offset in this partition is now {result.low_watermark}") + f" partition {partition.partition}. The minimum offset in this partition" + + f" is now {result.low_watermark}") except KafkaException as e: print( f"Error deleting records in topic {partition.topic} partition {partition.partition}" + From e46e3d6d14dfe4db8a705383254b4dfea2821dfb Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 22 May 2024 18:03:07 +0530 Subject: [PATCH 30/47] iformatting errors --- examples/adminapi.py | 8 ++++---- src/confluent_kafka/admin/__init__.py | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/adminapi.py b/examples/adminapi.py index e85e888f2..92a3559ad 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -848,12 +848,12 @@ def example_list_offsets(a, args): def example_delete_records(a, args): if len(args) == 0: raise ValueError( - f"Invalid number of arguments for delete_records, expected at least 3 " + - f"(Usage: delete_records [ ...])") + "Invalid number of arguments for delete_records, expected at least 3 " + + "(Usage: delete_records [ ...])") if len(args) % 3 != 0: raise ValueError( - f"Invalid number of arguments for delete_records " + - f"(Usage: delete_records [ ...])") + "Invalid number of arguments for delete_records " + + "(Usage: delete_records [ ...])") topic_partition_offset = [ TopicPartition(topic, int(partition), int(offset)) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index dab7bda1b..a09aaf8c8 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -540,7 +540,8 @@ def _check_delete_records(request): raise TypeError(f"Expected Request to be a list, got '{type(request).__name__}' ") for req in request: if not isinstance(req, _TopicPartition): - raise TypeError(f"Element of the request list must be of type 'TopicPartition' got '{type(req).__name__}' ") + raise TypeError(f"Element of the request list must be of type 'TopicPartition'" + + " got '{type(req).__name__}' ") if req.partition < 0: raise ValueError("'partition' cannot be negative") From 1366b2d67110f7506a0900a5286f3c78e7599d5e Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 22 May 2024 18:06:05 +0530 Subject: [PATCH 31/47] small change --- src/confluent_kafka/admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index a09aaf8c8..722f02c1d 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -541,7 +541,7 @@ def _check_delete_records(request): for req in request: if not isinstance(req, _TopicPartition): raise TypeError(f"Element of the request list must be of type 'TopicPartition'" + - " got '{type(req).__name__}' ") + f" got '{type(req).__name__}' ") if req.partition < 0: raise ValueError("'partition' cannot be negative") From 8ed70cb395a28380b8156f5556b2209cba2e0849 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 22 May 2024 18:08:00 +0530 Subject: [PATCH 32/47] small change --- src/confluent_kafka/admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 722f02c1d..bd525e316 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -540,7 +540,7 @@ def _check_delete_records(request): raise TypeError(f"Expected Request to be a list, got '{type(request).__name__}' ") for req in request: if not isinstance(req, _TopicPartition): - raise TypeError(f"Element of the request list must be of type 'TopicPartition'" + + raise TypeError("Element of the request list must be of type 'TopicPartition'" + f" got '{type(req).__name__}' ") if req.partition < 0: raise ValueError("'partition' cannot be negative") From dc1e05c4dbccfb524b227bfcb984be834de7b451 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Sat, 25 May 2024 00:52:38 +0530 Subject: [PATCH 33/47] requested changes --- CHANGELOG.md | 7 ++++--- src/confluent_kafka/admin/__init__.py | 1 + src/confluent_kafka/src/Admin.c | 22 ++++++++++++---------- tests/test_Admin.py | 7 ++++--- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53a5c6b4d..042605f32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,15 @@ # Confluent's Python client for Apache Kafka -## v2.4.1 +## v2.5.0 -v2.4.1 is a maintenance release with the following fixes and enhancements: +v2.5.0 is a feature release with the following features, fixes and enhancements: + - [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710) - Removed usage of `strcpy` to enhance security of the client (#1745) - Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745) - - [KIP-107](https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient) Added delete_records API (#1710) - Fixed documentation for default value of operation_timeout in create_topics, delete_topics, create_partitions apis (#1710) + confluent-kafka-python is based on librdkafka v2.4.1, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1) for a complete list of changes, enhancements, fixes and upgrade considerations. diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index bd525e316..5b5869015 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -545,6 +545,7 @@ def _check_delete_records(request): if req.partition < 0: raise ValueError("'partition' cannot be negative") + def create_topics(self, new_topics, **kwargs): """ Create one or more new topics. diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 4dd38c595..910d191f1 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4470,15 +4470,13 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset return NULL; } -static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) { - - size_t c_topic_partition_cnt = c_topic_partitions->cnt; +static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions, size_t c_topic_partition_cnt) { PyObject *result = NULL; PyObject *DeleteRecords_type = NULL; size_t i; - DeleteRecords_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecords"); - + DeleteRecords_type = cfl_PyObject_lookup("confluent_kafka", + "DeleteRecords"); if(!DeleteRecords_type){ cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecords type"); goto raise; @@ -4486,15 +4484,20 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li result = PyDict_New(); for(i=0; ielems[i]; - if (c_topic_partitions->elems[i].err) { - value = KafkaError_new_or_None(c_topic_partitions->elems[i].err, rd_kafka_err2str(c_topic_partitions->elems[i].err)); + + key = c_part_to_py(c_topic_partition); + + if (c_topic_partition->err) { + value = KafkaError_new_or_None(c_topic_partition->err, rd_kafka_err2str(c_topic_partition->err)); } else { PyObject *args = NULL; PyObject *kwargs = NULL; kwargs = PyDict_New(); - cfl_PyDict_SetLong(kwargs, "low_watermark", c_topic_partitions->elems[i].offset); + cfl_PyDict_SetLong(kwargs, "low_watermark", c_topic_partition->offset); args = PyTuple_New(0); value = PyObject_Call(DeleteRecords_type, args, kwargs); Py_DECREF(args); @@ -4502,7 +4505,6 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li if (value == NULL) goto raise; } - PyObject *key = c_part_to_py(c_topic_partition); PyDict_SetItem(result, key, value); Py_DECREF(key); Py_DECREF(value); @@ -4864,7 +4866,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - result = Admin_c_DeletedRecords_to_py(c_delete_records_res_list); + result = Admin_c_DeletedRecords_to_py(c_delete_records_res_list, c_delete_records_res_list->cnt); break; } diff --git a/tests/test_Admin.py b/tests/test_Admin.py index d9da7fbcd..3d0732587 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1175,14 +1175,15 @@ def test_delete_records(): a = AdminClient({"socket.timeout.ms": 10}) # Request-type tests - with pytest.raises(TypeError): + with pytest.raises(TypeError, match = "Expected Request to be a list, got 'NoneType'"): a.delete_records(None) - with pytest.raises(TypeError): + with pytest.raises(TypeError, match = "Expected Request to be a list, got 'int'"): a.delete_records(1) # Request-specific tests - with pytest.raises(TypeError): + with pytest.raises(TypeError, + match = "Element of the request list must be of type 'TopicPartition' got 'str'"): a.delete_records(["test-1"]) with pytest.raises(TypeError): From 8483b12661247b8021cfb093fcd03b5084ed9b1a Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Sat, 25 May 2024 01:02:04 +0530 Subject: [PATCH 34/47] indentation errors --- src/confluent_kafka/admin/__init__.py | 1 - tests/test_Admin.py | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 5b5869015..bd525e316 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -545,7 +545,6 @@ def _check_delete_records(request): if req.partition < 0: raise ValueError("'partition' cannot be negative") - def create_topics(self, new_topics, **kwargs): """ Create one or more new topics. diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 3d0732587..639e3383a 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1175,15 +1175,15 @@ def test_delete_records(): a = AdminClient({"socket.timeout.ms": 10}) # Request-type tests - with pytest.raises(TypeError, match = "Expected Request to be a list, got 'NoneType'"): + with pytest.raises(TypeError, match="Expected Request to be a list, got 'NoneType'"): a.delete_records(None) - with pytest.raises(TypeError, match = "Expected Request to be a list, got 'int'"): + with pytest.raises(TypeError, match="Expected Request to be a list, got 'int'"): a.delete_records(1) # Request-specific tests - with pytest.raises(TypeError, - match = "Element of the request list must be of type 'TopicPartition' got 'str'"): + with pytest.raises(TypeError, + match="Element of the request list must be of type 'TopicPartition' got 'str'"): a.delete_records(["test-1"]) with pytest.raises(TypeError): From 293a05003141574a82a36070e2f682de7f251124 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Sat, 25 May 2024 01:06:48 +0530 Subject: [PATCH 35/47] indentation errors --- tests/test_Admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 639e3383a..5e8ff2e6c 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1183,7 +1183,7 @@ def test_delete_records(): # Request-specific tests with pytest.raises(TypeError, - match="Element of the request list must be of type 'TopicPartition' got 'str'"): + match="Element of the request list must be of type 'TopicPartition' got 'str'"): a.delete_records(["test-1"]) with pytest.raises(TypeError): From fba0f7b285635985205239596ade792a72987468 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 27 May 2024 15:50:23 +0530 Subject: [PATCH 36/47] Requested changes --- src/confluent_kafka/__init__.py | 2 +- src/confluent_kafka/_model/__init__.py | 6 +- src/confluent_kafka/src/Admin.c | 90 ++++++++++--------- .../integration/admin/test_delete_records.py | 4 +- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 7a6b04d78..8f0ddad51 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -25,7 +25,7 @@ TopicCollection, TopicPartitionInfo, IsolationLevel, - DeleteRecords) + DeletedRecords) from .cimpl import (Producer, Consumer, diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 609cad8dc..5065bc8ff 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -151,10 +151,10 @@ def __lt__(self, other): return self.value < other.value -class DeleteRecords: +class DeletedRecords: """ - DeleteRecords - Result of a `AdminClient.delete_records` call associated to a partition. + DeletedRecords + Represents information about deleted records. Parameters ---------- diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 910d191f1..85cf66f59 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4470,52 +4470,56 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset return NULL; } -static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions, size_t c_topic_partition_cnt) { - PyObject *result = NULL; - PyObject *DeleteRecords_type = NULL; - size_t i; - - DeleteRecords_type = cfl_PyObject_lookup("confluent_kafka", - "DeleteRecords"); - if(!DeleteRecords_type){ - cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecords type"); - goto raise; - } - - result = PyDict_New(); - for(i=0; ielems[i]; - - key = c_part_to_py(c_topic_partition); - - if (c_topic_partition->err) { - value = KafkaError_new_or_None(c_topic_partition->err, rd_kafka_err2str(c_topic_partition->err)); - } else { - PyObject *args = NULL; - PyObject *kwargs = NULL; - kwargs = PyDict_New(); - cfl_PyDict_SetLong(kwargs, "low_watermark", c_topic_partition->offset); - args = PyTuple_New(0); - value = PyObject_Call(DeleteRecords_type, args, kwargs); - Py_DECREF(args); - Py_DECREF(kwargs); - if (value == NULL) +static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) { + PyObject *result = NULL; + PyObject *DeletedRecords_type = NULL; + + size_t c_topic_partition_cnt = c_topic_partitions->cnt; + size_t i; + + DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka", + "DeletedRecords"); + if(!DeletedRecords_type){ + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeletedRecords type"); goto raise; } - PyDict_SetItem(result, key, value); - Py_DECREF(key); - Py_DECREF(value); - } - Py_DECREF(DeleteRecords_type); - return result; + result = PyDict_New(); + for(i=0; ielems[i]; + key = c_part_to_py(c_topic_partition); + + if (c_topic_partition->err) { + value = KafkaError_new_or_None(c_topic_partition->err, rd_kafka_err2str(c_topic_partition->err)); + } else { + PyObject *args = NULL; + PyObject *kwargs = NULL; + kwargs = PyDict_New(); + cfl_PyDict_SetLong(kwargs, "low_watermark", c_topic_partition->offset); + args = PyTuple_New(0); + value = PyObject_Call(DeletedRecords_type, args, kwargs); + Py_DECREF(args); + Py_DECREF(kwargs); + + if (value == NULL) + goto raise; + } + + PyDict_SetItem(result, key, value); + Py_DECREF(key); + Py_DECREF(value); + } + + Py_DECREF(DeletedRecords_type); + return result; + raise: - Py_XDECREF(result); - Py_XDECREF(DeleteRecords_type); - return NULL; + Py_XDECREF(result); + Py_XDECREF(DeletedRecords_type); + return NULL; } /** @@ -4866,7 +4870,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); - result = Admin_c_DeletedRecords_to_py(c_delete_records_res_list, c_delete_records_res_list->cnt); + result = Admin_c_DeletedRecords_to_py(c_delete_records_res_list); break; } diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 3bf0fe82c..0e24bece8 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -14,7 +14,7 @@ # limitations under the License. from confluent_kafka.admin import OffsetSpec -from confluent_kafka import TopicPartition, DeleteRecords +from confluent_kafka import TopicPartition, DeletedRecords def test_delete_records(kafka_cluster): @@ -56,7 +56,7 @@ def test_delete_records(kafka_cluster): # Check if the earliest available offset is equal to the offset passed to the delete records function res = list(fs1.values())[0].result() - assert isinstance(res, DeleteRecords) + assert isinstance(res, DeletedRecords) assert (res.low_watermark == list(fs2.values())[0].result().offset) # Delete created topic From cdd3ae4c06fc6fcd7192e2142af991548f4ca39e Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 27 May 2024 15:52:07 +0530 Subject: [PATCH 37/47] changelog changes --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 042605f32..8b5d7063a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,8 @@ v2.5.0 is a feature release with the following features, fixes and enhancements: - Fixed documentation for default value of operation_timeout in create_topics, delete_topics, create_partitions apis (#1710) -confluent-kafka-python is based on librdkafka v2.4.1, see the -[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1) +confluent-kafka-python is based on librdkafka v2.5.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0) for a complete list of changes, enhancements, fixes and upgrade considerations. From e3b5b96808b1e99e2441177901d7b45c6709d4a3 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 27 May 2024 16:46:25 +0530 Subject: [PATCH 38/47] indentation --- src/confluent_kafka/admin/__init__.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index bd525e316..c563d062a 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1222,18 +1222,18 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): in the specified Topic and Partition. :param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects - consisting of the Topic Partition and Offsets on which we have to perform the deleteion. + consisting of the Topic Partition and Offsets on which we have to perform the deletion. :param float request_timeout: The overall request timeout in seconds, - including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` :param float operation_timeout: The operation timeout in seconds, - controlling how long the delete_records request will block - on the broker waiting for the record deletion to propagate - in the cluster. A value of 0 returns immediately. Default: 60 + controlling how long the delete_records request will block + on the broker waiting for the record deletion to propagate + in the cluster. A value of 0 returns immediately. Default: 60 :returns: A dict of futures keyed by the TopicPartition. - The future result() method returns DeletedRecords - or raises KafkaException + The future result() method returns DeletedRecords + or raises KafkaException :rtype: dict[TopicPartition, future] From 9f5dcdee4f05aad344e462cc25fd05a87aa72aee Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 27 May 2024 17:18:57 +0530 Subject: [PATCH 39/47] requested changes --- src/confluent_kafka/admin/__init__.py | 13 ++++++------ src/confluent_kafka/src/Admin.c | 30 +++++++++++++-------------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index c563d062a..b496f1e46 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1216,13 +1216,14 @@ def list_offsets(self, topic_partition_offsets, **kwargs): super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs) return futmap - def delete_records(self, topic_partition_offsets_list, **kwargs): + def delete_records(self, topic_partition_offsets, **kwargs): """ Deletes all the records before the specified offset, in the specified Topic and Partition. - :param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects - consisting of the Topic Partition and Offsets on which we have to perform the deletion. + param list(TopicPartitions) topic_partition_offsets: A list of + TopicPartition objects having `offset` field set to the offset + before which all the records should be deleted. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -1241,10 +1242,10 @@ def delete_records(self, topic_partition_offsets_list, **kwargs): :raises TypeError: Invalid input type. :raises ValueError: Invalid input value. """ - AdminClient._check_delete_records(topic_partition_offsets_list) + AdminClient._check_delete_records(topic_partition_offsets) f, futmap = AdminClient._make_futures_v2( - topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result) + topic_partition_offsets, _TopicPartition, AdminClient._make_futmap_result) - super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs) + super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs) return futmap diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 85cf66f59..e83613ddd 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2967,16 +2967,16 @@ const char Admin_list_offsets_doc[] = PyDoc_STR( * @brief Delete records */ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ - PyObject *topic_partition_offset = NULL, *future; + PyObject *topic_partition_offsets = NULL, *future; int del_record_cnt = 1; rd_kafka_DeleteRecords_t **c_obj = NULL; struct Admin_options options = Admin_options_INITIALIZER; rd_kafka_AdminOptions_t *c_options = NULL; - rd_kafka_topic_partition_list_t *c_topic_partition_offset = NULL; + rd_kafka_topic_partition_list_t *c_topic_partition_offsets = NULL; CallState cs; rd_kafka_queue_t *rkqu; - static char *kws[] = {"topic_partition_offset", + static char *kws[] = {"topic_partition_offsets", "future", /* options */ "request_timeout", @@ -2984,7 +2984,7 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|ff", kws, - &topic_partition_offset, + &topic_partition_offsets, &future, &options.request_timeout, &options.operation_timeout)) { @@ -3002,15 +3002,15 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ * admin operation is finished, so we need to keep our own refcount. */ Py_INCREF(future); - if (!PyList_Check(topic_partition_offset)) { + if (!PyList_Check(topic_partition_offsets)) { PyErr_SetString(PyExc_ValueError, - "topic_partitions_offset must be a list"); + "topic_partitions_offsets must be a list"); goto err; } - c_topic_partition_offset = py_to_c_parts(topic_partition_offset); + c_topic_partition_offsets = py_to_c_parts(topic_partition_offsets); c_obj = malloc(sizeof(rd_kafka_DeleteRecords_t *) * del_record_cnt); - c_obj[0] = rd_kafka_DeleteRecords_new(c_topic_partition_offset); + c_obj[0] = rd_kafka_DeleteRecords_new(c_topic_partition_offsets); /* Use librdkafka's background thread queue to automatically dispatch * Admin_background_event_cb() when the admin operation is finished. */ @@ -3032,8 +3032,8 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ rd_kafka_DeleteRecords_destroy_array(c_obj, del_record_cnt); free(c_obj); - rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); - Py_XDECREF(topic_partition_offset); + rd_kafka_topic_partition_list_destroy(c_topic_partition_offsets); + Py_XDECREF(topic_partition_offsets); Py_RETURN_NONE; err: @@ -3045,16 +3045,16 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ rd_kafka_AdminOptions_destroy(c_options); Py_DECREF(future); } - if(c_topic_partition_offset) { - rd_kafka_topic_partition_list_destroy(c_topic_partition_offset); + if(c_topic_partition_offsets) { + rd_kafka_topic_partition_list_destroy(c_topic_partition_offsets); } - Py_XDECREF(topic_partition_offset); + Py_XDECREF(topic_partition_offsets); return NULL; } const char Admin_delete_records_doc[] = PyDoc_STR( - ".. py:function:: delete_records(topic_partitions, future, [request_timeout, operation_timeout])\n" + ".. py:function:: delete_records(topic_partition_offsets, future, [request_timeout, operation_timeout])\n" "\n" " Delete all the records for the particular topic partition before the specified offset provided in the request.\n" "\n" @@ -4478,7 +4478,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li size_t i; DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka", - "DeletedRecords"); + "DeletedRecords"); if(!DeletedRecords_type){ cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeletedRecords type"); goto raise; From 227cb0fce67e697a7bcf37d615fa3f0a2a2073fa Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 29 May 2024 15:06:14 +0530 Subject: [PATCH 40/47] requested changes --- src/confluent_kafka/admin/__init__.py | 16 ++++++++++------ src/confluent_kafka/src/Admin.c | 7 ++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index b496f1e46..c623a647b 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -554,7 +554,8 @@ def create_topics(self, new_topics, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate - in the cluster. A value of 0 returns immediately. Default: 60 + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -587,7 +588,8 @@ def delete_topics(self, topics, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate - in the cluster. A value of 0 returns immediately. Default: 60 + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -625,7 +627,8 @@ def create_partitions(self, new_partitions, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate - in the cluster. A value of 0 returns immediately. Default: 60 + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -1218,8 +1221,8 @@ def list_offsets(self, topic_partition_offsets, **kwargs): def delete_records(self, topic_partition_offsets, **kwargs): """ - Deletes all the records before the specified offset, - in the specified Topic and Partition. + Deletes all the records before the specified offsets (not including), + in the specified Topic and Partitions. param list(TopicPartitions) topic_partition_offsets: A list of TopicPartition objects having `offset` field set to the offset @@ -1230,7 +1233,8 @@ def delete_records(self, topic_partition_offsets, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the delete_records request will block on the broker waiting for the record deletion to propagate - in the cluster. A value of 0 returns immediately. Default: 60 + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :returns: A dict of futures keyed by the TopicPartition. The future result() method returns DeletedRecords diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index e83613ddd..0503bb8e0 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4474,7 +4474,6 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li PyObject *result = NULL; PyObject *DeletedRecords_type = NULL; - size_t c_topic_partition_cnt = c_topic_partitions->cnt; size_t i; DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka", @@ -4485,7 +4484,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li } result = PyDict_New(); - for(i=0; icnt; i++){ PyObject *key = NULL; PyObject *value = NULL; @@ -4504,8 +4503,10 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li Py_DECREF(args); Py_DECREF(kwargs); - if (value == NULL) + if (value == NULL){ + Py_DECREF(key); goto raise; + } } PyDict_SetItem(result, key, value); From e359c0f40798d14e498ebc9b6f0d1eb5a52d8ffe Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Wed, 29 May 2024 15:12:24 +0530 Subject: [PATCH 41/47] indentation errors --- src/confluent_kafka/admin/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index c623a647b..77d644227 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -554,8 +554,8 @@ def create_topics(self, new_topics, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate - in the cluster. A value of 0 returns immediately. - Default: `socket.timeout.ms/1000.0` + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -588,7 +588,7 @@ def delete_topics(self, topics, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate - in the cluster. A value of 0 returns immediately. + in the cluster. A value of 0 returns immediately. Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time @@ -627,8 +627,8 @@ def create_partitions(self, new_partitions, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate - in the cluster. A value of 0 returns immediately. - Default: `socket.timeout.ms/1000.0` + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -1233,7 +1233,7 @@ def delete_records(self, topic_partition_offsets, **kwargs): :param float operation_timeout: The operation timeout in seconds, controlling how long the delete_records request will block on the broker waiting for the record deletion to propagate - in the cluster. A value of 0 returns immediately. + in the cluster. A value of 0 returns immediately. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures keyed by the TopicPartition. From d93162aaa22ec5ccb37d85c0b996c3843bd53ed6 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Thu, 30 May 2024 12:44:34 +0530 Subject: [PATCH 42/47] minor change --- src/confluent_kafka/src/Admin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 0503bb8e0..7333b22a0 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4474,7 +4474,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li PyObject *result = NULL; PyObject *DeletedRecords_type = NULL; - size_t i; + int i; DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka", "DeletedRecords"); From 8172ca8e12899a070a558fe48f3055fc126fd6bd Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 8 Jul 2024 12:31:22 +0530 Subject: [PATCH 43/47] PR Comments --- examples/adminapi.py | 22 ++++++++------- src/confluent_kafka/__init__.py | 5 ++-- src/confluent_kafka/_model/__init__.py | 14 ---------- src/confluent_kafka/admin/__init__.py | 22 ++++++++------- src/confluent_kafka/admin/_records.py | 27 +++++++++++++++++++ src/confluent_kafka/src/Admin.c | 15 +++++------ .../integration/admin/test_delete_records.py | 4 +-- 7 files changed, 64 insertions(+), 45 deletions(-) create mode 100644 src/confluent_kafka/admin/_records.py diff --git a/examples/adminapi.py b/examples/adminapi.py index 92a3559ad..8442d6453 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -849,25 +849,29 @@ def example_delete_records(a, args): if len(args) == 0: raise ValueError( "Invalid number of arguments for delete_records, expected at least 3 " + - "(Usage: delete_records [ ...])") + "(Usage: delete_records [ ..])") if len(args) % 3 != 0: raise ValueError( "Invalid number of arguments for delete_records " + - "(Usage: delete_records [ ...])") + "(Usage: delete_records [ ..])") - topic_partition_offset = [ + topic_partition_offsets = [ TopicPartition(topic, int(partition), int(offset)) for topic, partition, offset in zip(args[::3], args[1::3], args[2::3]) ] - futmap = a.delete_records(topic_partition_offset) + futmap = a.delete_records(topic_partition_offsets) for partition, fut in futmap.items(): try: result = fut.result() - print( - f"All records deleted before offset {partition.offset} in topic {partition.topic}" + - f" partition {partition.partition}. The minimum offset in this partition" + - f" is now {result.low_watermark}") + if partition.offset == -1: + print(f"All records deleted in topic {partition.topic} partition {partition.partition}." + + f"The minimum offset in this partition is now {result.low_watermark}") + else: + print( + f"All records deleted before offset {partition.offset} in topic {partition.topic}" + + f" partition {partition.partition}. The minimum offset in this partition" + + f" is now {result.low_watermark}") except KafkaException as e: print( f"Error deleting records in topic {partition.topic} partition {partition.partition}" + @@ -912,7 +916,7 @@ def example_delete_records(a, args): ' DELETE ..]\n') sys.stderr.write(' list_offsets ' + '[ ..]\n') - sys.stderr.write(' delete_records [ ...]\n') + sys.stderr.write(' delete_records [ ..]\n') sys.exit(1) broker = sys.argv[1] diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index af823df98..6a24d6e9e 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -24,8 +24,7 @@ ConsumerGroupState, TopicCollection, TopicPartitionInfo, - IsolationLevel, - DeletedRecords) + IsolationLevel) from .cimpl import (Producer, Consumer, @@ -50,7 +49,7 @@ 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', - 'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo', 'DeletedRecords'] + 'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo'] __version__ = version()[0] diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 5065bc8ff..1c2ec89f0 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -149,17 +149,3 @@ def __lt__(self, other): if self.__class__ != other.__class__: return NotImplemented return self.value < other.value - - -class DeletedRecords: - """ - DeletedRecords - Represents information about deleted records. - - Parameters - ---------- - low_watermark: int - The "low watermark" for the topic partition on which the deletion was executed. - """ - def __init__(self, low_watermark): - self.low_watermark = low_watermark diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index b4d356fa1..8685b2985 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -54,6 +54,8 @@ from ._listoffsets import (OffsetSpec, # noqa: F401 ListOffsetsResultInfo) +from ._records import DeletedRecords # noqa: F401 + from .._model import TopicCollection as _TopicCollection from ..cimpl import (KafkaException, # noqa: F401 @@ -1225,17 +1227,19 @@ def delete_records(self, topic_partition_offsets, **kwargs): Deletes all the records before the specified offsets (not including), in the specified Topic and Partitions. - param list(TopicPartitions) topic_partition_offsets: A list of - TopicPartition objects having `offset` field set to the offset - before which all the records should be deleted. + :param list(TopicPartitions) topic_partition_offsets: A list of + TopicPartition objects having `offset` field set to the offset + before which all the records should be deleted. + `offset` can be set to :py:const:`OFFSET_END` (-1) to delete all records + in the partition. :param float request_timeout: The overall request timeout in seconds, - including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` :param float operation_timeout: The operation timeout in seconds, - controlling how long the delete_records request will block - on the broker waiting for the record deletion to propagate - in the cluster. A value of 0 returns immediately. - Default: `socket.timeout.ms/1000.0` + controlling how long the delete_records request will block + on the broker waiting for the record deletion to propagate + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` :returns: A dict of futures keyed by the TopicPartition. The future result() method returns DeletedRecords diff --git a/src/confluent_kafka/admin/_records.py b/src/confluent_kafka/admin/_records.py new file mode 100644 index 000000000..4638924e4 --- /dev/null +++ b/src/confluent_kafka/admin/_records.py @@ -0,0 +1,27 @@ +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class DeletedRecords: + """ + DeletedRecords + Represents information about deleted records. + + Parameters + ---------- + low_watermark: int + The "low watermark" for the topic partition on which the deletion was executed. + """ + def __init__(self, low_watermark): + self.low_watermark = low_watermark diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 7333b22a0..bd33590d2 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2993,7 +2993,7 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETERECORDS, &options, future); - if (!c_options) { + if (!c_options) { goto err; /* Exception raised by options_to_c() */ } @@ -3002,12 +3002,11 @@ PyObject* Admin_delete_records (Handle *self,PyObject *args,PyObject *kwargs){ * admin operation is finished, so we need to keep our own refcount. */ Py_INCREF(future); - if (!PyList_Check(topic_partition_offsets)) { - PyErr_SetString(PyExc_ValueError, - "topic_partitions_offsets must be a list"); - goto err; - } c_topic_partition_offsets = py_to_c_parts(topic_partition_offsets); + + if(!c_topic_partition_offsets) { + goto err; /* Exception raised by py_to_c_parts() */ + } c_obj = malloc(sizeof(rd_kafka_DeleteRecords_t *) * del_record_cnt); c_obj[0] = rd_kafka_DeleteRecords_new(c_topic_partition_offsets); @@ -4476,7 +4475,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li int i; - DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka", + DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka.admin", "DeletedRecords"); if(!DeletedRecords_type){ cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeletedRecords type"); @@ -4503,7 +4502,7 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li Py_DECREF(args); Py_DECREF(kwargs); - if (value == NULL){ + if (!value){ Py_DECREF(key); goto raise; } diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 0e24bece8..0d3cc2091 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from confluent_kafka.admin import OffsetSpec -from confluent_kafka import TopicPartition, DeletedRecords +from confluent_kafka.admin import OffsetSpec, DeletedRecords +from confluent_kafka import TopicPartition def test_delete_records(kafka_cluster): From 60a42588dc4475a9918af25a6ee8312f795a73ce Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 8 Jul 2024 12:37:37 +0530 Subject: [PATCH 44/47] Added new test suggested in the PR --- .../integration/admin/test_delete_records.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/integration/admin/test_delete_records.py b/tests/integration/admin/test_delete_records.py index 0d3cc2091..fdbaca749 100644 --- a/tests/integration/admin/test_delete_records.py +++ b/tests/integration/admin/test_delete_records.py @@ -63,3 +63,74 @@ def test_delete_records(kafka_cluster): fs = admin_client.delete_topics([topic]) for topic, f in fs.items(): f.result() + + +def test_delete_records_multiple_topics_and_partitions(kafka_cluster): + """ + Test delete_records, delete the records upto the specified offset + in that particular partition of the specified topic. + """ + admin_client = kafka_cluster.admin() + num_partitions = 3 + # Create two topics with a single partition + topic = kafka_cluster.create_topic("test-del-records", + { + "num_partitions": num_partitions, + "replication_factor": 1, + }) + topic2 = kafka_cluster.create_topic("test-del-records2", + { + "num_partitions": num_partitions, + "replication_factor": 1, + }) + topics = [topic, topic2] + partitions = list(range(num_partitions)) + # Create Producer instance + p = kafka_cluster.producer() + for t in topics: + for partition in partitions: + p.produce(t, "Message-1", partition=partition) + p.produce(t, "Message-2", partition=partition) + p.produce(t, "Message-3", partition=partition) + p.flush() + requests = dict( + [ + (TopicPartition(t, partition), OffsetSpec.earliest()) + for t in topics + for partition in partitions + ] + ) + # Check if the earliest available offset for this topic partition is 0 + fs = admin_client.list_offsets(requests) + assert all([p.result().offset == 0 for p in fs.values()]) + delete_index = 0 + # Delete the records + for delete_partitions in [ + # Single partition no deletion + [TopicPartition(topic, 0, 0)], + # Single topic, two partitions, single record deleted + [TopicPartition(topic, 0, 1), TopicPartition(topic, 1, 1)], + # Two topics, four partitions, two records deleted + [TopicPartition(topic, 2, 2), TopicPartition(topic2, 0, 2), + TopicPartition(topic2, 1, 2), TopicPartition(topic2, 2, 2)], + ]: + list_offsets_requests = dict([ + (part, OffsetSpec.earliest()) for part in delete_partitions + ]) + futmap_delete = admin_client.delete_records(delete_partitions) + delete_results = [(part, fut.result()) + for part, fut in futmap_delete.items()] + futmap_list = admin_client.list_offsets(list_offsets_requests) + list_results = dict([(part, fut.result()) + for part, fut in futmap_list.items()]) + for part, delete_result in delete_results: + list_result = list_results[part] + assert isinstance(delete_result, DeletedRecords) + assert delete_result.low_watermark == list_result.offset + assert delete_result.low_watermark == delete_index + delete_index += 1 + + # Delete created topics + fs = admin_client.delete_topics(topics) + for topic, f in fs.items(): + f.result() From 65b34176020398cd5c54a9231c9177971012c385 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 8 Jul 2024 17:24:47 +0530 Subject: [PATCH 45/47] Fixed a memory leak in ListOffsets Result --- CHANGELOG.md | 1 + src/confluent_kafka/src/Admin.c | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee1aa9886..a6ffe4ce0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ v2.5.0 is a feature release with the following features, fixes and enhancements: - Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy. - Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object - Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property + - Fixed a memory leak in ListOffsets Result. confluent-kafka-python is based on librdkafka v2.5.0, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index bd33590d2..f4dd3366a 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4436,6 +4436,7 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset result = PyDict_New(); for(i=0; i Date: Mon, 8 Jul 2024 18:24:37 +0530 Subject: [PATCH 46/47] PR comments --- CHANGELOG.md | 2 +- src/confluent_kafka/src/Admin.c | 6 ++---- src/confluent_kafka/src/Metadata.c | 1 + 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ffe4ce0..1443bcdd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ v2.5.0 is a feature release with the following features, fixes and enhancements: - Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy. - Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object - Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property - - Fixed a memory leak in ListOffsets Result. + - Fixed some memory leaks related to `PyDict_SetItem`. confluent-kafka-python is based on librdkafka v2.5.0, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.5.0) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index f4dd3366a..c58166d6e 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -4480,10 +4480,8 @@ static PyObject *Admin_c_DeletedRecords_to_py (const rd_kafka_topic_partition_li DeletedRecords_type = cfl_PyObject_lookup("confluent_kafka.admin", "DeletedRecords"); - if(!DeletedRecords_type){ - cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeletedRecords type"); - goto raise; - } + if(!DeletedRecords_type) + goto raise; /* Exception raised by lookup() */ result = PyDict_New(); for(i=0; icnt; i++){ diff --git a/src/confluent_kafka/src/Metadata.c b/src/confluent_kafka/src/Metadata.c index 31e1db933..cdbf23d2f 100644 --- a/src/confluent_kafka/src/Metadata.c +++ b/src/confluent_kafka/src/Metadata.c @@ -257,6 +257,7 @@ static PyObject *c_brokers_to_py (Handle *self, goto err; } + Py_DECREF(key); Py_DECREF(broker); } From e1f484db9557efefaeac262d22ae076fd90acf4b Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 9 Jul 2024 03:04:23 +0530 Subject: [PATCH 47/47] PR comments related to documentation --- CHANGELOG.md | 2 +- docs/index.rst | 10 +++++ src/confluent_kafka/admin/__init__.py | 54 +++++++++++++-------------- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1443bcdd0..5d5599e9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ v2.5.0 is a feature release with the following features, fixes and enhancements: - Added an example to show the usage of the custom logger with `AdminClient` - Removed usage of `strcpy` to enhance security of the client (#1745) - Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745) - - Fixed documentation for default value of operation_timeout in create_topics, delete_topics, create_partitions apis (#1710) + - Fixed documentation for default value of `operation_timeout` and `request_timeout` in various Admin apis (#1710) - Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy. - Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object - Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property diff --git a/docs/index.rst b/docs/index.rst index a74a8766b..185007280 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -77,6 +77,7 @@ Supporting classes - :ref:`MemberAssignment ` - :ref:`MemberDescription ` - :ref:`ConsumerGroupDescription ` + - :ref:`DeletedRecords ` Experimental These classes are experimental and are likely to be removed, or subject to incompatible @@ -387,6 +388,15 @@ ConsumerGroupDescription .. autoclass:: confluent_kafka.admin.ConsumerGroupDescription :members: +.. _pythonclient_deleted_records: + +************** +DeletedRecords +************** + +.. autoclass:: confluent_kafka.admin.DeletedRecords + :members: + .. _pythonclient_member_assignment: **************** diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 8685b2985..9101b651f 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -561,7 +561,7 @@ def create_topics(self, new_topics, **kwargs): Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :param bool validate_only: If true, the request is only validated without creating the topic. Default: False @@ -595,7 +595,7 @@ def delete_topics(self, topics, **kwargs): Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each topic, keyed by the topic name. The future result() method returns None. @@ -634,7 +634,7 @@ def create_partitions(self, new_partitions, **kwargs): Default: `socket.timeout.ms/1000.0` :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :param bool validate_only: If true, the request is only validated without creating the partitions. Default: False @@ -668,7 +668,7 @@ def describe_configs(self, resources, **kwargs): :param list(ConfigResource) resources: Resources to get the configuration for. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each resource, keyed by the ConfigResource. The type of the value returned by the future result() method is @@ -712,7 +712,7 @@ def alter_configs(self, resources, **kwargs): :param list(ConfigResource) resources: Resources to update configuration of. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0`. + on broker, and response. Default: `socket.timeout.ms/1000.0`. :param bool validate_only: If true, the request is validated only, without altering the configuration. Default: False @@ -745,7 +745,7 @@ def incremental_alter_configs(self, resources, **kwargs): :param list(ConfigResource) resources: Resources to update configuration of. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0`. + on broker, and response. Default: `socket.timeout.ms/1000.0`. :param bool validate_only: If true, the request is validated only, without altering the configuration. Default: False :param int broker: Broker id to send the request to. When @@ -777,7 +777,7 @@ def create_acls(self, acls, **kwargs): to create. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each ACL binding, keyed by the :class:`AclBinding` object. The future result() method returns None on success. @@ -816,7 +816,7 @@ def describe_acls(self, acl_binding_filter, **kwargs): that is a prefix of the given resource name :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A future returning a list(:class:`AclBinding`) as result @@ -851,7 +851,7 @@ def delete_acls(self, acl_binding_filters, **kwargs): that is a prefix of the given resource name :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each ACL binding filter, keyed by the :class:`AclBindingFilter` object. The future result() method returns a list of :class:`AclBinding`. @@ -878,7 +878,7 @@ def list_consumer_groups(self, **kwargs): :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :param set(ConsumerGroupState) states: only list consumer groups which are currently in these states. @@ -915,7 +915,7 @@ def describe_consumer_groups(self, group_ids, **kwargs): :param bool include_authorized_operations: If True, fetches group AclOperations. Default: False :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each group, keyed by the group_id. The future result() method returns :class:`ConsumerGroupDescription`. @@ -948,7 +948,7 @@ def describe_topics(self, topics, **kwargs): :param bool include_authorized_operations: If True, fetches topic AclOperations. Default: False :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each topic, keyed by the topic. The future result() method returns :class:`TopicDescription`. @@ -982,7 +982,7 @@ def describe_cluster(self, **kwargs): :param bool include_authorized_operations: If True, fetches topic AclOperations. Default: False :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A future returning description of the cluster as result @@ -1006,7 +1006,7 @@ def delete_consumer_groups(self, group_ids, **kwargs): :param list(str) group_ids: List of group_ids which need to be deleted. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each group, keyed by the group_id. The future result() method returns None. @@ -1043,7 +1043,7 @@ def list_consumer_group_offsets(self, list_consumer_group_offsets_request, **kwa :param bool require_stable: If True, fetches stable offsets. Default: False :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each group, keyed by the group id. The future result() method returns :class:`ConsumerGroupTopicPartitions`. @@ -1076,7 +1076,7 @@ def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **k partition; and corresponding offset to be updated. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures for each group, keyed by the group id. The future result() method returns :class:`ConsumerGroupTopicPartitions`. @@ -1126,7 +1126,7 @@ def describe_user_scram_credentials(self, users=None, **kwargs): to describe all user's credentials. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: In case None is passed it returns a single future. The future yields a dict[str, UserScramCredentialsDescription] @@ -1162,7 +1162,7 @@ def alter_user_scram_credentials(self, alterations, **kwargs): The pair (user, mechanism) must be unique among alterations. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures keyed by user name. The future result() method returns None or @@ -1193,7 +1193,7 @@ def list_offsets(self, topic_partition_offsets, **kwargs): querying. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :returns: A dict of futures keyed by TopicPartition. The future result() method returns ListOffsetsResultInfo @@ -1225,25 +1225,25 @@ def list_offsets(self, topic_partition_offsets, **kwargs): def delete_records(self, topic_partition_offsets, **kwargs): """ Deletes all the records before the specified offsets (not including), - in the specified Topic and Partitions. + in the specified topics and partitions. - :param list(TopicPartitions) topic_partition_offsets: A list of - TopicPartition objects having `offset` field set to the offset + :param list(TopicPartition) topic_partition_offsets: A list of + :class:`.TopicPartition` objects having `offset` field set to the offset before which all the records should be deleted. `offset` can be set to :py:const:`OFFSET_END` (-1) to delete all records in the partition. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time - on broker, and response. Default: `socket.timeout.ms*1000.0` + on broker, and response. Default: `socket.timeout.ms/1000.0` :param float operation_timeout: The operation timeout in seconds, - controlling how long the delete_records request will block + controlling how long the `delete_records` request will block on the broker waiting for the record deletion to propagate in the cluster. A value of 0 returns immediately. Default: `socket.timeout.ms/1000.0` - :returns: A dict of futures keyed by the TopicPartition. - The future result() method returns DeletedRecords - or raises KafkaException + :returns: A dict of futures keyed by the :class:`.TopicPartition`. + The future result() method returns :class:`.DeletedRecords` + or raises :class:`.KafkaException` :rtype: dict[TopicPartition, future]