KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)#12240
Conversation
…when broker requests a controlled shutdown
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
Outdated
Show resolved
Hide resolved
...ta/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
Show resolved
Hide resolved
| "type": "metadata", | ||
| "name": "BrokerRegistrationChangeRecord", | ||
| "validVersions": "0", | ||
| "validVersions": "0-1", |
There was a problem hiding this comment.
There was a problem hiding this comment.
Okay. I am not sure where or how to document this decision but it would be good to be consistent across change type records.
There was a problem hiding this comment.
I actually think we should not increase the record version unless there is an incompatible change. To my knowledge, that is what the record version has conveyed historically. However, I don't think there's any harm in increasing it, and I also don't think we need to solve it in this PR. Let's go with Colin's suggestion here and we can continue this discussion on the mailing list or offline
|
@jsancio Thanks for your review. I have addressed or replied to your comments. |
b8caf3a to
2761eef
Compare
| changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true))); | ||
| changedBrokers.put(record.id(), broker.maybeCloneWith( | ||
| BrokerRegistrationFencingChange.UNFENCE.asBoolean(), | ||
| Optional.empty() | ||
| )); | ||
| } | ||
|
|
||
| public void replay(UnfenceBrokerRecord record) { | ||
| BrokerRegistration broker = getBrokerOrThrow(record.id(), record.epoch(), "unfence"); | ||
| changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false))); | ||
| changedBrokers.put(record.id(), broker.maybeCloneWith( | ||
| BrokerRegistrationFencingChange.FENCE.asBoolean(), | ||
| Optional.empty() | ||
| )); |
There was a problem hiding this comment.
We have an inflight change that changes these enum values around. We need to be careful in the order we merge this PRs: #12236 (comment)
| if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) { | ||
| metadataVersion = MetadataVersion.IBP_3_0_IV1; | ||
| } |
There was a problem hiding this comment.
I think we need a similar change for the ClusterControlManager iterator. If I am not mistaken it is possible for the FeatureControlManager to return UNINITIALIZED in both active controllers and inactive controllers.
Should we document this default version in MetadataVersion instead?
cc @mumrah
There was a problem hiding this comment.
Right, once that PR is merged, KRaft will be at metadata version IBP_3_0_IV1 implicitly until the controller finishes bootstrapping. This will be true on the controller and broker side of things
| * Returns true if the broker is in fenced state; Returns false if it is | ||
| * not or if it does not exist. | ||
| */ | ||
| public boolean unfenced(int brokerId) { |
There was a problem hiding this comment.
Minor but I am pretty sure that we don't use this method anymore in src/main. If so, I say that we just remove it.
There was a problem hiding this comment.
That's right. However, it is used in many places in the tests. I haven't found a good way to replace it in tests that is as convenient as this predicate. I would keep it.
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
Outdated
Show resolved
Hide resolved
dengziming
left a comment
There was a problem hiding this comment.
Good job, only one minor suggestion.
| .collect(Collectors.toSet()); | ||
| } | ||
|
|
||
| private short registerBrokerRecordVersion() { |
There was a problem hiding this comment.
How about moving this method to MetadataVersion? I find this logic also used in BrokerRegistration.toRecord
| record, | ||
| record.id(), | ||
| record.epoch(), | ||
| BrokerRegistrationFencingChange.UNFENCE.asBoolean(), |
There was a problem hiding this comment.
Similar to Jose mentioned, this should be noticed if #12236 merged first.
jsancio
left a comment
There was a problem hiding this comment.
LGTM. Test failures seem unrelated.
This PR implements a first part of KIP-841. Specifically, it implements the following:
The second half (the AlterPartition part) is implemented in #12181.
Committer Checklist (excluded from commit message)