Skip to content

Commit

Permalink
Requested Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
PratRanj07 committed May 8, 2024
1 parent 654e069 commit 86ae277
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 35 deletions.
12 changes: 3 additions & 9 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,19 +862,13 @@ def example_delete_records(a, args):
for partition, fut in futmap.items():
try:
result = fut.result()
if result.error:
print(f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
f" before offset {partition.offset}: {result.error.str()}")
else:
print(
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}")
print(
f"All records deleted before offset {partition.offset} in topic {partition.topic}" +
f" partition {partition.partition}. The minimum offset in this partition is now {result.offset}")
except KafkaException as e:
print(
f"Error deleting records in topic {partition.topic} partition {partition.partition}" +
f" before offset {partition.offset}: {e}")
except Exception:
raise


if __name__ == '__main__':
Expand Down
5 changes: 3 additions & 2 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
ConsumerGroupState,
TopicCollection,
TopicPartitionInfo,
IsolationLevel)
IsolationLevel,
DeleteRecordsResult )

from .cimpl import (Producer,
Consumer,
Expand All @@ -49,7 +50,7 @@
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
'IsolationLevel']
'IsolationLevel', 'DeleteRecordsResult']

__version__ = version()[0]

Expand Down
13 changes: 13 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,16 @@ def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value

class DeleteRecordsResult:
"""
DeleteRecordsResult
Result of a `AdminClient.delete_records` call associated to a partition.
Parameters
----------
offset: int
The offset returned by the delete_records call.
"""
def __init__(self, offset):
self.offset = offset
12 changes: 4 additions & 8 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,8 @@ def _check_delete_records(request):
for req in request:
if not isinstance(req, _TopicPartition):
raise TypeError("Element of the request list must be of type 'TopicPartition' ")
if req is None:
raise ValueError("Individual request in the request list cannot be 'None'")
if req.partition < 0:
raise ValueError("Elements of the list must not have negative value for 'partition' field")
raise ValueError(" 'partition' cannot be negative")

def create_topics(self, new_topics, **kwargs):
"""
Expand Down Expand Up @@ -1235,10 +1233,8 @@ def delete_records(self, topic_partition_offsets_list, **kwargs):
in the cluster. A value of 0 returns immediately. Default: 0
:returns: A dict of futures keyed by the TopicPartition.
The future result() method returns a TopicPartition list indicating that
deletion operation have been performed till the specified Topic Partition
and error if any has occured. User has to check if any error has occured
during deletion in each partition.
The future result() method returns DeleteRecordsResult
or raises KafkaException
:rtype: dict[TopicPartition, future]
Expand All @@ -1249,7 +1245,7 @@ def delete_records(self, topic_partition_offsets_list, **kwargs):
AdminClient._check_delete_records(topic_partition_offsets_list)

f, futmap = AdminClient._make_futures_v2(
topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result_from_list)
topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result)

super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs)
return futmap
46 changes: 45 additions & 1 deletion src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4470,6 +4470,50 @@ static PyObject *Admin_c_ListOffsetsResultInfos_to_py (const rd_kafka_ListOffset
return NULL;
}

static PyObject *Admin_c_DeleteRecordsResult_to_py (const rd_kafka_topic_partition_list_t *c_topic_partitions) {

size_t c_topic_partition_cnt = c_topic_partitions->cnt;
PyObject *result = NULL;
PyObject *DeleteRecordsResult_type = NULL;
size_t i;

DeleteRecordsResult_type = cfl_PyObject_lookup("confluent_kafka", "DeleteRecordsResult");

if(!DeleteRecordsResult_type){
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Unable to load DeleteRecordsResult type");
return NULL;
}

result = PyDict_New();
for(i=0; i<c_topic_partition_cnt; i++){
PyObject *value = NULL;
rd_kafka_topic_partition_t *c_topic_partition = &c_topic_partitions->elems[i];
if (c_topic_partitions->elems[i].err) {
value = KafkaError_new_or_None(c_topic_partitions->elems[i].err, rd_kafka_err2str(c_topic_partitions->elems[i].err));
} else {
PyObject *args = NULL;
PyObject *kwargs = NULL;
kwargs = PyDict_New();
cfl_PyDict_SetLong(kwargs, "offset", c_topic_partitions->elems[i].offset);
args = PyTuple_New(0);
value = PyObject_Call(DeleteRecordsResult_type, args, kwargs);
Py_DECREF(args);
Py_DECREF(kwargs);
if (value == NULL)
goto raise;
}
PyDict_SetItem(result, c_part_to_py(c_topic_partition), value);
Py_DECREF(value);
}

Py_DECREF(DeleteRecordsResult_type);
return result;
raise:
Py_DECREF(result);
Py_DECREF(DeleteRecordsResult_type);
return NULL;
}

/**
* @brief Event callback triggered from librdkafka's background thread
* when Admin API results are ready.
Expand Down Expand Up @@ -4818,7 +4862,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev);
const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res);

result = c_parts_to_py(c_delete_records_res_list);
result = Admin_c_DeleteRecordsResult_to_py(c_delete_records_res_list);
break;
}

Expand Down
22 changes: 7 additions & 15 deletions tests/integration/admin/test_delete_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec
from confluent_kafka import TopicPartition
from confluent_kafka.admin import OffsetSpec
from confluent_kafka import TopicPartition, DeleteRecordsResult


def test_delete_records(kafka_cluster):
Expand Down Expand Up @@ -43,29 +43,21 @@ def test_delete_records(kafka_cluster):

# Check if the earliest avilable offset for this topic partition is 0
fs = admin_client.list_offsets(requests)
for _, fut in fs.items():
result = fut.result()
assert isinstance(result, ListOffsetsResultInfo)
assert (result.offset == 0)
result = list(fs.values())[0].result()
assert (result.offset == 0)

topic_partition_offset = TopicPartition(topic, 0, 2)

# Delete the records
fs1 = admin_client.delete_records([topic_partition_offset])
earliest_offset_available = 0

# Find the earliest available offset for that specific topic partition after deletion has been done
fs2 = admin_client.list_offsets(requests)
for _, fut in fs2.items():
result = fut.result()
assert isinstance(result, ListOffsetsResultInfo)
earliest_offset_available = result.offset

# Check if the earliest available offset is equal to the offset passed to the delete records function
for _, fut in fs1.items():
result = fut.result()
assert isinstance(result, TopicPartition)
assert (result.offset == earliest_offset_available)
res = list(fs1.values())[0].result()
assert isinstance(res, DeleteRecordsResult)
assert (res.offset == list(fs2.values())[0].result().offset)

# Delete created topic
fs = admin_client.delete_topics([topic])
Expand Down

0 comments on commit 86ae277

Please sign in to comment.