Skip to content

KAFKA-13972; Ensure that replicas are stopped after cancelled reassignment#12271

Closed
hachikuji wants to merge 8 commits intoapache:trunkfrom
hachikuji:KAFKA-13972
Closed

KAFKA-13972; Ensure that replicas are stopped after cancelled reassignment#12271
hachikuji wants to merge 8 commits intoapache:trunkfrom
hachikuji:KAFKA-13972

Conversation

@hachikuji
Copy link
Contributor

When a reassignment is cancelled, the controller must send StopReplica to all Adding replicas to ensure that the partition state is removed. Currently, this does not necessarily result in a bump to the leader epoch, which means that the StopReplica may be ignored by the Adding replica due to KIP-570. When this happens, the partition becomes stray and must be manually cleaned up.

We fix the problem here by ensuring that the leader epoch is bumped when a replica transitions to OfflineReplica even if the replica is not a leader or in the current ISR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji hachikuji changed the title KAFKA-13935; Ensure that replicas are stopped after cancelled reassignment KAFKA-13972; Ensure that replicas are stopped after cancelled reassignment Jun 8, 2022
@hachikuji
Copy link
Contributor Author

hachikuji commented Jun 9, 2022

There are some failures in ControllerIntegrationTest due to the additional epoch bump. The controlled shutdown logic is not doing quite what I expected, so I need to understand it a little better. It looks to me like we are sending LeaderAndIsr requests to the shutting down replicas and relying on the check here to keep the replica from restarting itself: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1694. I had expected that we would send StopReplica instead.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the PR. Left some comments below.

} else {
// Even if the replica is not in the ISR. We must bump the epoch to ensure the replica
// is fenced from replication and the `StopReplica` can be sent with a bumped epoch.
partition -> leaderAndIsr.newEpoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments on this.
(1) This seems to be a too low level to do this. When we cancel a reassignment, we may remove multiple replicas from a partition. Instead of bumping up the leader epoch for every replica, it's probably better to bump up the leader epoch once. Perhaps it's better to do this at the high level in KafkaController.updateCurrentReassignment().
(2) Controlled shutdown also goes through Offline transition. It's possible that the shutting down broker is out of isr. In that case, do we want to bump up the leader epoch?
(3) The current controlled shutdown logic also seems to have the same issue that its StopReplicaRequest will be ignored. When doing a controlled shutdown for a follower, it seems that we don't bump up the leader epoch. So, the StopReplicaRequest will be ignored by the follower?
(4) Should we do the same fix in KRaft?

Copy link
Contributor Author

@hachikuji hachikuji Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thanks for the comments. I think you are right about this issue affecting ControlledShutdown as well. I have been trying to understand that path a little better since it looks like it is not doing quite what we (or at least I) expect today. Also agree about the epoch bump being too low-level. We might need the fencing of replicas to happen at the partition level instead of the replica level. Let me take a look at this.

Interestingly, I don't think the issue affects KRaft. We bump the leader epoch in KRaft any time a replica is removed from the ISR or the replica set.

@junrao
Copy link
Contributor

junrao commented Jun 9, 2022

@hachikuji : For controlled shutdown, if the shutting down broker is the leader, we send a leaderAndIsr to every replica. If the shutting down broker is a follower, we send leaderAndIsr to replicas that are not shutting down and send StopReplica to the shutting down broker.

@hachikuji
Copy link
Contributor Author

hachikuji commented Jun 14, 2022

Some updates here. In the latest patch, I have refactored the OnlinePartition state transition in PartitionStateMachine to allow for general LeaderAndIsr changes (instead of just leader changes). This allows us to cancel or complete a reassignment in one single step. This saves the need to do additional leader epoch bumps in order to get unneeded replicas to accept the StopReplica request. Additionally, I found that the logic for cancellation was generally unsafe. It blindly assumed that unneeded replicas can be removed from the ISR even if that leaves no replicas from the original replica set. In the latest patch, I have added some validation to ensure that cancellation is safe. If it is not, we return INVALID_REPLICA_ASSIGNMENT. I found that KRaft already implements this behavior.

Additionally, since controlled shutdown had the same problem with StopReplica requests getting ignored by followers, I have used the refactored state transition logic in PartitionStateMachine to handle this case as well.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the PR. The approach does seems to address most of issue. But it's a relative big change. An alternatively approach is to relax the check in handling StopReplica in ReplicaManager such that we accept it if the epoch doesn't change. Any downside with that approach?

// Attempt to elect new leaders if needed and remove the broker from ISRs.
// This will bump the epoch and send `StopReplica` to the shutting down broker
// for any partition that it is no longer the leader of.
partitionStateMachine.handleStateChanges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, with this change, we are also passing in partitions whose leader is not the shutting down broker to handleStateChanges(). This means that we could change the leader of a partition unnecessarily. Consider a partition with replica assignment 1,2,3 and leader of 3. When handling controlled shutdown of broker 1, the new logic seems to force the leader to change to 2, which is unnecessary.

} else {
// Try to use the preferred target leader if it is live and in the ISR
val preferredTargetLeader = targetReplicas.head
if (liveTargetReplicas.contains(preferredTargetLeader) && newIsr.contains(preferredTargetLeader)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the isr check necessary since we already verified all target replicas are in the ISR earlier?


// First we need to remove any unneeded replicas from the ISR and update the leader
// if necessary. This will ensure that `StopReplica` requests will be sent to the
// unneeded replicas, but they will not be deleted yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment accurate? It seems that OnlinePartition send StopReplica with delete to the unneeded replicas.

case Some(leadershipInfo) =>
// If we are canceling a reassignment, we need to verify that some of the original
// replicas are still in the ISR.
if (leadershipInfo.leaderAndIsr.isr.intersect(currentAssignment.originReplicas).isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new constraint? Could we cancel before if one of the original replicas is not in sync?

)
}

if (stopReplicaRecipients.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are sending StopReplicaRequest as part of the Online transition, could we update the comments above doHandleStateChanges()?

* ReplicaDeletionSuccessful -> NonExistentReplica
* -- remove the replica from the in memory partition replica assignment cache
*
* NewReplica,OnlineReplica,OfflineReplica -> UnassignedReplicas
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change handles the reassignment case pretty well. However, there seems to be a couple of other places with a similar issue with StopReplicaRequest. We send StopReplicaRequest when a replica transitions to OfflineReplica. This is called through (1) KafkaController.processLeaderAndIsrResponseReceived() and (2) KafkaController.onBrokerFailure(). In (1), if the replica on the failed disk is not in ISR, there is no need to remove it from ISR and bump up the leader epoch. So, a StopReplicaRequest with the same epoch will be sent to the broker, which will then be ignored. This doesn't seem to cause a real problem since ReplicaManager stops the replica fetcher on handling a failed disk already. But, it's kind of weird to send a StopReplicaRequest just to be ignored. (2) has a similar issue. If the failed broker is not in ISR, there is no leader epoch bump. The StopReplicaRequest will be ignored by the broker, if it happens to be alive, but de-registered from ZK.

Copy link
Contributor Author

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Yeah, I see your point. I think relaxing the StopReplica acceptance check would address the issue with controlled shutdown in the common case. I was a little surprised to see the way this worked today using LeaderAndIsr, but it seems to work well enough that we can probably get away without major changes there.

For reassignment cancellation, I do think the issues are a bit deeper than not sending StopReplica with a higher epoch. We didn't have a safe way in general for removing the adding replicas from the ISR. The logic in the controller was also missing error handling in some cases, so my feeling is we need most of those changes in any case.

Probably the main question is whether we could do all of this without refactoring PartitionStateMachine. The answer is probably yes. It felt simpler to me to treat assignment cancellation as a single high-level goal rather than as a sequence of individual partition and replica state changes. This also made testing easier since the transitions are not all hidden inside a function in KafkaController. On the other hand, this was an awkward fit for the replica state machine, which is why I had to introduce the UnassignedReplica state to bypass the usual state transitions. Now that the code is there, I do think it is an improvement, but it might not be worth the risk with the zk controller on the way out. Let me know what you think.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Looked at the PR again. Overall, it does seem to be an improvement. Currently, we use PartitionStateMachine to change leader and ReplicaStateMachine to shrink ISR. The idea is that this allows better code sharing for different controller events (controlled shutdown, reassignment, etc). But it turns out each case needs a bit of specialization. So, we end up in a state with a mix of shared and customized code. This makes the logic hard to follow and causes redundant requests to be sent. Perhaps it's cleaner to have all requests generated from a single place for each of the main controller event. So, we could continue to pursue this approach. I am wondering if the logic in onReplicasBecomeOffline() should follow the same pattern in ControlledShutdown and partition reassignment, where both the LeaderAndIsrRequest and StopReplicaRequests are generated from the PartitionStateMachine.

if (!isAlterPartitionEnabled) {
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
private def cancelReassignment(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could add a comment that the cancellation may fail.

@hachikuji
Copy link
Contributor Author

I'm going to close this PR and submit a new one which is tailored a little more closely to the problem with cancellation. I think there's not much appetite at the moment for a big change affecting the old controller since it is on the way out.

@hachikuji hachikuji closed this Jan 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants