KAFKA-13943: Make LocalLogManager implementation consistent with the RaftClient interface contract#12224
Conversation
dengziming
left a comment
There was a problem hiding this comment.
The first point is an omission of #10909, Thanks.
jsancio
left a comment
There was a problem hiding this comment.
Thanks for the changes @divijvaidya .
For my understanding, what is the motivation for this change? For example, did you discover this inconsistency while working on additional tests?
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
Outdated
Show resolved
Hide resolved
Apologies for late response @jsancio (I was on leave) and thank you for your review. It's interesting that you ask this question because there indeed is a motivation. I was trying to fix the flaky test described in https://issues.apache.org/jira/browse/KAFKA-13943. As you will observe from the stack trace, due to the bug fixed here in this PR, a snapshot was being created with LONG_MAX offset and thus leading to further cascading failures. The test is flaky because it only happens when an append request started off from a leader but when the request was waiting for synchronized lock in the tryAppend() method, the leader changed. Sometimes, the leader doesn't change while it is waiting and sometimes it does. With this fix, I hope to fix that bug. Does this answer your question? Please feel free to add your thoughts to the JIRA directly. |
|
@jsancio this is ready for another review. Thanks for your time again. |
|
@jsancio please review when you get a chance. Currently multiple tests in This code change fixes that. |
tombentley
left a comment
There was a problem hiding this comment.
Thanks @divijvaidya, I left a few comments but I'm not really an expert in this area yet. Hopefully @jsancio can take another look.
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
Outdated
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
Outdated
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
Outdated
Show resolved
Hide resolved
|
@hachikuji requesting your review on this PR when you get a chance! |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the fix. Left a minor suggestion.
| private long append(int epoch, List<T> records, boolean isAtomic) { | ||
| if (epoch < this.epoch) { | ||
| throw new NotLeaderException("Append failed because the epoch doesn't match"); | ||
| throw new NotLeaderException("Append failed because the given epoch is stale"); |
There was a problem hiding this comment.
nit: could we add the provided epoch and the current epoch to this message?
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch! LGTM.
1\ As per the interface contract for
RaftClient.scheduleAtomicAppend()[1], it should throw aNotLeaderExceptionexception when the provided current leader epoch does not match the current epoch. However, the currentLocalLogManager's implementation of the API returns a LONG_MAX instead of throwing an exception. This change fixes the behaviour and makes it consistent with the interface contract.2\ As per the interface contract for
RaftClient.resign(epoch)[2] if the parameter epoch does not match the current epoch, this call will be ignored. But in the currentLocalLogManagerimplementation the leader epoch might change when the thread is waiting to acquire a lock onshared.tryAppend()(note that tryAppend() is a synchronized method). In such a case, if a NotALeaderException is thrown (as per code change in 1\ above), then resign should be ignored.[1] https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/RaftClient.java#L145
[2] https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/RaftClient.java#L208