KAFKA-13114: Revert state and reregister raft listener#11116
KAFKA-13114: Revert state and reregister raft listener#11116hachikuji merged 6 commits intoapache:trunkfrom
Conversation
a23e1a3 to
6b70e48
Compare
RaftClient's scheduleAppend may split the list of records into multiple batches. This means that it is possible for the active controller to see a committed offset for which it doesn't have an in-memory snapshot. If the active controller needs to renounce and it is missing an in-memory snapshot, then revert the state and register a new listener. This will cause the controller to replay the entire metadata partition.
6b70e48 to
dda2b3d
Compare
hachikuji
left a comment
There was a problem hiding this comment.
The change seems reasonable, but should we add test cases?
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Outdated
Show resolved
Hide resolved
|
I'm trying to think of some approach for validating this logic. It is difficult because it is handling unexpected exceptions. One thought I had is implementing a poison message of some kind which could expire after some TTL. When the controller sees the poison message, it would check if it is still active and raise an exception accordingly. Something like that could be used in an integration test, which might be simpler than trying to induce a failure by mucking with internal state. Another idea is to corrupt the log on one of the nodes, but I'm not sure this would hit the right path. In fact, this is probably a gap at the moment. If the batch reader fails during iteration, we should probably resign and perhaps even fail. I'll file a separate JIRA for this. In any case, I think we should try to come up with some way to exercise this path. Otherwise it's hard to say if it even works (though it looks reasonable enough). |
jsancio
left a comment
There was a problem hiding this comment.
@hachikuji I added a test for this case. I had to update LocalLogManager to better match Raft's leader election pattern. It is still not perfect but it is good enough for this test.
| throw new IllegalStateException("The raft client was unable to allocate a buffer for an append"); | ||
| } else if (offset == Long.MAX_VALUE) { | ||
| throw new IllegalStateException("Unable to append records since this is not the leader"); | ||
| } |
| .max(); | ||
|
|
||
| if (firstOffset.isPresent() && resignAfterNonAtomicCommit.getAndSet(false)) { | ||
| // Emulate losing leadering in them middle of a non-atomic append by not writing |
| String topicName = "topic-name"; | ||
|
|
||
| try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { | ||
| try (QuorumControllerTestEnv controlEnv = |
There was a problem hiding this comment.
nit: could we pull this into the first try?
There was a problem hiding this comment.
Cool. I didn't know that was valid Java.
| ); | ||
|
|
||
| // Wait for the new active controller | ||
| final QuorumController newController = controlEnv.activeController(); |
There was a problem hiding this comment.
This confused me a little bit since we are trying to verify that the state on the original controller resets properly. That is what is happening here since there is only one controller in the test, but it is obscured a little bit by the new variable. Maybe it would be clearer to use the original reference and write this as:
assertEquals(controller, controlEnv.activeController());Also, is there an epoch or something we can bump to ensure the transition?
| // Wait for the controller to become active again | ||
| assertSame(controller, controlEnv.activeController()); | ||
| assertTrue( | ||
| oldClaimEpoch < controller.curClaimEpoch(), | ||
| String.format("oldClaimEpoch = %s, newClaimEpoch = %s", oldClaimEpoch, controller.curClaimEpoch()) | ||
| ); |
There was a problem hiding this comment.
Only this should have changed. The rest are indentation changes from the previous commit.
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. One minor suggestion.
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
Outdated
Show resolved
Hide resolved
…r.java Co-authored-by: Jason Gustafson <jason@confluent.io>
RaftClient's scheduleAppend may split the list of records into multiple batches. This means that it is possible for the active controller to see a committed offset for which it doesn't have an in-memory snapshot. If the active controller needs to renounce and it is missing an in-memory snapshot, then revert the state and reregister the Raft listener. This will cause the controller to replay the entire metadata partition. Reviewers: Jason Gustafson <jason@confluent.io>
RaftClient's scheduleAppend may split the list of records into multiple batches. This means that it is possible for the active controller to see a committed offset for which it doesn't have an in-memory snapshot. If the active controller needs to renounce and it is missing an in-memory snapshot, then revert the state and reregister the Raft listener. This will cause the controller to replay the entire metadata partition. Reviewers: Jason Gustafson <jason@confluent.io>
RaftClient's scheduleAppend may split the list of records into multiple
batches. This means that it is possible for the active controller to
see a committed offset for which it doesn't have an in-memory snapshot.
If the active controller needs to renounce and it is missing an
in-memory snapshot, then revert the state and reregister the Raft
listener. This will cause the controller to replay the entire metadata
partition.
Committer Checklist (excluded from commit message)