-
Notifications
You must be signed in to change notification settings - Fork 901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a delete records api #1710
Merged
Merged
Changes from 33 commits
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
863a1e8
Added a delete records api
PratRanj07 93fc6c3
Corrected semaphore errors
PratRanj07 7336986
Corrected another semaphore errors
PratRanj07 4417733
Corrected yet another semaphore errors
PratRanj07 f7952b6
Corrected more semaphore errors
PratRanj07 6293a56
Minor changes
PratRanj07 b4a1f08
Whitespace error
PratRanj07 900540e
Requested changes
PratRanj07 49ced95
Whitespace error
PratRanj07 5e67ea3
Minor changes
PratRanj07 684efd8
Requested changes
PratRanj07 f0626ff
Whitespace error
PratRanj07 60b8feb
Error handling change in example
PratRanj07 654e069
Minor change in error handling example
PratRanj07 86ae277
Requested Changes
PratRanj07 5afe007
semaphore errors corrected
PratRanj07 d7f37db
Update examples/adminapi.py
PratRanj07 5e4cd7d
Update examples/adminapi.py
PratRanj07 50443bc
Update src/confluent_kafka/_model/__init__.py
PratRanj07 bd64ea6
Update src/confluent_kafka/admin/__init__.py
PratRanj07 fddaa6a
Update src/confluent_kafka/admin/__init__.py
PratRanj07 58ac04c
Update src/confluent_kafka/src/Admin.c
PratRanj07 9ba4168
Update src/confluent_kafka/src/Admin.c
PratRanj07 8f7f2a6
Update src/confluent_kafka/admin/__init__.py
PratRanj07 9f3152d
Update src/confluent_kafka/__init__.py
PratRanj07 e176626
Update src/confluent_kafka/src/Admin.c
PratRanj07 fb202c8
Update src/confluent_kafka/src/Admin.c
PratRanj07 766aa16
Merge branch 'master' into deleterecords
PratRanj07 e240e2f
requested changes
PratRanj07 cd5cfac
semaphore errors
PratRanj07 e46e3d6
iformatting errors
PratRanj07 1366b2d
small change
PratRanj07 8ed70cb
small change
PratRanj07 dc1e05c
requested changes
PratRanj07 8483b12
indentation errors
PratRanj07 293a050
indentation errors
PratRanj07 fba0f7b
Requested changes
PratRanj07 cdd3ae4
changelog changes
PratRanj07 e3b5b96
indentation
PratRanj07 9f5dcde
requested changes
PratRanj07 227cb0f
requested changes
PratRanj07 e359c0f
indentation errors
PratRanj07 d93162a
minor change
PratRanj07 d965435
Merge branch 'master' into deleterecords
pranavrth 8172ca8
PR Comments
pranavrth 60a4258
Added new test suggested in the PR
pranavrth 65b3417
Fixed a memory leak in ListOffsets Result
pranavrth 3f1a7f3
PR comments
pranavrth e1f484d
PR comments related to documentation
pranavrth File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
emasab marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -534,6 +534,17 @@ def _check_list_offsets_request(topic_partition_offsets, kwargs): | |||||
if not isinstance(kwargs['isolation_level'], _IsolationLevel): | ||||||
raise TypeError("isolation_level argument should be an IsolationLevel") | ||||||
|
||||||
@staticmethod | ||||||
def _check_delete_records(request): | ||||||
if not isinstance(request, list): | ||||||
raise TypeError(f"Expected Request to be a list, got '{type(request).__name__}' ") | ||||||
for req in request: | ||||||
if not isinstance(req, _TopicPartition): | ||||||
raise TypeError("Element of the request list must be of type 'TopicPartition'" + | ||||||
f" got '{type(req).__name__}' ") | ||||||
if req.partition < 0: | ||||||
raise ValueError("'partition' cannot be negative") | ||||||
|
||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
def create_topics(self, new_topics, **kwargs): | ||||||
""" | ||||||
Create one or more new topics. | ||||||
|
@@ -543,7 +554,7 @@ def create_topics(self, new_topics, **kwargs): | |||||
:param float operation_timeout: The operation timeout in seconds, | ||||||
controlling how long the CreateTopics request will block | ||||||
on the broker waiting for the topic creation to propagate | ||||||
in the cluster. A value of 0 returns immediately. Default: 0 | ||||||
in the cluster. A value of 0 returns immediately. Default: 60 | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
:param float request_timeout: The overall request timeout in seconds, | ||||||
including broker lookup, request transmission, operation time | ||||||
on broker, and response. Default: `socket.timeout.ms*1000.0` | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
@@ -576,7 +587,7 @@ def delete_topics(self, topics, **kwargs): | |||||
:param float operation_timeout: The operation timeout in seconds, | ||||||
controlling how long the DeleteTopics request will block | ||||||
on the broker waiting for the topic deletion to propagate | ||||||
in the cluster. A value of 0 returns immediately. Default: 0 | ||||||
in the cluster. A value of 0 returns immediately. Default: 60 | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
:param float request_timeout: The overall request timeout in seconds, | ||||||
including broker lookup, request transmission, operation time | ||||||
on broker, and response. Default: `socket.timeout.ms*1000.0` | ||||||
|
@@ -614,7 +625,7 @@ def create_partitions(self, new_partitions, **kwargs): | |||||
:param float operation_timeout: The operation timeout in seconds, | ||||||
controlling how long the CreatePartitions request will block | ||||||
on the broker waiting for the partition creation to propagate | ||||||
in the cluster. A value of 0 returns immediately. Default: 0 | ||||||
in the cluster. A value of 0 returns immediately. Default: 60 | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
:param float request_timeout: The overall request timeout in seconds, | ||||||
including broker lookup, request transmission, operation time | ||||||
on broker, and response. Default: `socket.timeout.ms*1000.0` | ||||||
|
@@ -1204,3 +1215,36 @@ def list_offsets(self, topic_partition_offsets, **kwargs): | |||||
|
||||||
super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs) | ||||||
return futmap | ||||||
|
||||||
def delete_records(self, topic_partition_offsets_list, **kwargs): | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
Deletes all the records before the specified offset, | ||||||
in the specified Topic and Partition. | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
:param list(TopicPartitions) topic_partitions_offset_list: A list of TopicPartition objects | ||||||
pranavrth marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
consisting of the Topic Partition and Offsets on which we have to perform the deleteion. | ||||||
:param float request_timeout: The overall request timeout in seconds, | ||||||
including broker lookup, request transmission, operation time | ||||||
on broker, and response. Default: `socket.timeout.ms*1000.0` | ||||||
:param float operation_timeout: The operation timeout in seconds, | ||||||
PratRanj07 marked this conversation as resolved.
Show resolved
Hide resolved
emasab marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
controlling how long the delete_records request will block | ||||||
on the broker waiting for the record deletion to propagate | ||||||
in the cluster. A value of 0 returns immediately. Default: 60 | ||||||
|
||||||
:returns: A dict of futures keyed by the TopicPartition. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
The future result() method returns DeletedRecords | ||||||
or raises KafkaException | ||||||
|
||||||
:rtype: dict[TopicPartition, future] | ||||||
|
||||||
:raises KafkaException: Operation failed locally or on broker. | ||||||
:raises TypeError: Invalid input type. | ||||||
:raises ValueError: Invalid input value. | ||||||
""" | ||||||
AdminClient._check_delete_records(topic_partition_offsets_list) | ||||||
|
||||||
f, futmap = AdminClient._make_futures_v2( | ||||||
topic_partition_offsets_list, _TopicPartition, AdminClient._make_futmap_result) | ||||||
|
||||||
super(AdminClient, self).delete_records(topic_partition_offsets_list, f, **kwargs) | ||||||
return futmap |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add
DeletedRecords
toindex.rst
afterConsumerGroupDescription
and