KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2)#12181
KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2)#12181dajac merged 15 commits intoapache:trunkfrom
Conversation
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplica.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Outdated
Show resolved
Hide resolved
| eventManager.put( | ||
| AlterPartitionReceived(alterPartitionRequest.brokerId, alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback) | ||
| ) | ||
| def alterPartitions( |
There was a problem hiding this comment.
Just checking my understanding. It looks like we have not modified this logic to use INELIGIBLE_REPLICA. Is that right? Should we?
There was a problem hiding this comment.
That's right. Do you think we need it?
I suppose that we could have a similar race condition, especially if the shutting down replica is not in the ISR at the time of shutting down. In this case, we don't bump the leader epoch so it could make it back into the ISR before receiving the stop replica request. We could prevent shutting down replicas to join the ISR. One issue is that the leaders will never learn about this state so they don't have a way to prevent unnecessary retries. This is a similar discussion that we had for KRaft.
Given that we explicitly stop replicas, I tend to believe that this race condition is less likely in ZK mode. I wonder if it is worth fixing it. What do you think?
There was a problem hiding this comment.
This is somewhat related to this issue: #12271. I guess once we fix this, then relying on StopReplica and the leader epoch bump may be good enough.
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
Show resolved
Hide resolved
| // In ZK mode if the deployed software of the controller uses version 2.8 or above | ||
| // but the IBP is below 2.8, the controller does not assign topic ids. In this case, | ||
| // it should not advertise the AlterPartition API version 2 and above. | ||
| val alterPartitionApiVersion = response.apiVersion(ApiKeys.ALTER_PARTITION.id) | ||
| if (alterPartitionApiVersion != null) { | ||
| alterPartitionApiVersion.setMaxVersion( | ||
| if (metadataVersion.isTopicIdsSupported) | ||
| alterPartitionApiVersion.maxVersion() | ||
| else | ||
| 1.toShort | ||
| ) | ||
| } |
There was a problem hiding this comment.
@hachikuji There is something that I missed in this PR. If the controller run 2.8 software or above but does not use IBP 2.8 or above yet, topic ids are not assigned. In other words, it is not safe to use the AlterPartition version 2 in this case even if the controller supports it. We don't really have a mechanism to do this at the moment so I have put the logic to do this here. We should definitely think about a better approach. What do you think?
There was a problem hiding this comment.
I have filed a JIRA for tracking this: https://issues.apache.org/jira/browse/KAFKA-13975. It seems to me that it would be better to do it separately.
There was a problem hiding this comment.
I don't follow. In this true for all RPC dealing with topic ids? The sender has IBP 2.8 but the receiver doesn't support IBP 2.8. I would think that in general the RPC receiver needs to allow for RPC from IBP versions that are greater than the local IBP.
There was a problem hiding this comment.
I recall there were some tricky cases when doing the upgrade to use TopicIds. It is possible for the controller to initialize on one of the nodes with the updated IBP and create TopicIds for all topics, but then change to a new node with a lower IBP. How does the controller handle the existence of TopicIds in the zk metadata if the IBP is below 2.8? At a quick glance, it looks like it would still parse it and load it into the ControllerContext. It seems ok in this scenario for the controller to accept AlterPartition with TopicIds even if the local IBP is lower. This is how we usually deal with IBP upgrades. Are there any other scenarios we need to worry about? Perhaps upgrade_test.py is sufficient to test this scenario?
There was a problem hiding this comment.
The other scenario that I was considering is the following:
- all brokers run software 2.8 or above
- controller upgrades to IBP 2.8 or above
- controller fails over to a node still on an IBP < 2.8 - topics with ID keep them here as you pointed out.
- new topics are created - those won't have topic ids
Is it safe? It seems to be OK because those new topics won't have a topic id so the AlterPartitionManager will downgrade to using version 1 in this case.
OK. I have convinced myself that we don't need this check after all. The AlterPartitionManager's logic to downgrade is sufficient to handle both cases. Thanks for the clarification.
…roller is not on IBP >= 2.8" This reverts commit 6addd52.
artemlivshits
left a comment
There was a problem hiding this comment.
I'm wondering if the topic id change could be done separately? It has a lot of mechanical changes.
| metadataCache match { | ||
| // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are | ||
| // allowed to join the ISR. This does not apply to ZK mode. | ||
| case kRaftMetadataCache: KRaftMetadataCache => | ||
| !kRaftMetadataCache.isBrokerFenced(followerReplicaId) && | ||
| !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) | ||
|
|
||
| case _ => true | ||
| } |
There was a problem hiding this comment.
Would it be better to encapsulate the KRaft / ZK behavior difference in the metadataCache? Then this function would just call the metadataCache without explicit checking the kind of the cache.
There was a problem hiding this comment.
This is what I did originally but we moved to this solution based on reviewers' feedback. The rational of doing it here is that ZK does not have such information so reviewers felt like having methods, in the ZK metadata cache, which are not implemented could be misleading. Personally, I am fine either ways.
| // and 2) that the request was not applied. Even if the controller that sent the response | ||
| // is stale, we are guaranteed from the monotonicity of the controller epoch that the | ||
| // request could not have been applied by any past or future controller. | ||
| partitionState = proposedIsrState.lastCommittedState |
There was a problem hiding this comment.
In KRaft mode, could the state be updated via metadata and applied concurrently such that processing this would override a concurrently updated last state?
There was a problem hiding this comment.
We rollback the previous partition state here only if the the partition state still matches our proposed partition state. If it does not, it means that the partition was updated via the metadata log in the mean time. This check is in submitAlterPartition before calling handleAlterPartitionError.
There was a problem hiding this comment.
Sounds good. This invariant isn't immediately visible from this code so maybe a comment and / or assert would make it more clear.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch. Left some small comments, but LGTM overall. Will leave you to merge once addressed.
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/controller/ControllerRequestContextTest.java
Outdated
Show resolved
Hide resolved
It is too late for doing this but I do agree that it was a lot of changes in this PR. It is always tricky when multiple changes are tight to bumping the version of an API. We usually prefer to do them together in order to avoid having partial updates in trunk for an API update. |
|
Failed test is not related: Merging to trunk. |
|
@artemlivshits I merged the PR. If you have any followups, I will address them separately. |
This PR implements KIP-841. Specifically, it implements the following:
Committer Checklist (excluded from commit message)