-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19630: Reordered OR operands in archiveRecords method for SharePartiton #20391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… method so that short-circuit does not prevent archiving necessary records / batches
apoorvmittal10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I have suggested to add 1 more validation in the test.
| // Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal | ||
| // state when the corresponding acquisition lock timer task expires. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end of this current method:
Can you call sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))). Validate the response and cached state. This will make sure that acknowledge did nothing and also didn't fail. However, I ll discuss this behaviour in a separate PR, when I ll update the method docs.
Then call sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run(); and validate the respective records are directly archived.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. I have made the required changes in the latest commit.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Build is failing though.
As per the current implementation in archiveRecords, when LSO is
updated, if we have multiple record batches before the new LSO, then
only the first one gets archived. This is because of the following lines
of code ->
isAnyOffsetArchived = isAnyOffsetArchived || archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, initialState);isAnyBatchArchived = isAnyBatchArchived || archiveCompleteBatch(inFlightBatch, initialState);The first record / batch will make
isAnyOffsetArchived/isAnyBatchArchivedtrue, after which this line of code willshort-circuit and the methods
archivePerOffsetBatchRecords/archiveCompleteBatchwill not be called again. This PR changes theorder of the expressions so that the short-circuit does not prevent from
archiving all the required batches.
Reviewers: Apoorv Mittal apoorvmittal10@gmail.com