Skip to content

KAFKA-16538; Upgrade kraft version #19416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 22, 2025

Conversation

jsancio
Copy link
Member

@jsancio jsancio commented Apr 8, 2025

This change implements upgrading the kraft version from 0 to 1. The
kraft version for the cluster metadata partition is recorded using the
KRaftVersion control record. If there is no KRaftVersion control record
the default kraft version is 0.

The kraft version is upgraded using the UpdateFeatures RPC. These RPCs
are handled by the QuorumController and FeatureControlManager. This
change adds special handling in the FeatureControlManager so that
upgrades to the kraft.version are directed to
RaftClient#upgradeKRaftVersion.

To allow the FeatureControlManager to call
RaftClient#upgradeKRaftVersion is a non-blocking fashion, the kraft
version upgrade uses optimistic locking. The call to
RaftClient#upgradeKRaftVersion does validations of the version change.
If the validations succeeds, it generates the necessary control records
and adds them to the BatchAccumulator.

Before the kraft version can be upgraded to version 1, all of the
brokers and controllers in the cluster need to support kraft version 1.
The check that all brokers support kraft version 1 is done by the
FeatureControlManager. The check that all of the controllers support
kraft version is done by KafkaRaftClient and LeaderState.

When the kraft version is 0, the kraft leader starts by assuming that
all voters do not support kraft version 1. The leader discovers which
voters support kraft version 1 through the UpdateRaftVoter RPC. The
KRaft leader handles UpdateRaftVoter RPCs by storing the updated
information in-memory until the kraft version is upgraded to version 1.
This state is stored in LeaderState and contains the latest directory
id, endpoints and supported kraft version for each voter.

Only when the KRaft leader has received an UpdateRaftVoter RPC from all
of the voters will it allow the upgrade from kraft.version 0 to 1.

@github-actions github-actions bot added core Kafka Broker kraft labels Apr 8, 2025
@jsancio jsancio changed the title Kafka 16538 upgrade kraft version KAFKA-16538; Upgrade kraft version Apr 8, 2025
Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing, will pick up tomorrow

@@ -38,19 +40,20 @@ public class FollowerState implements EpochState {
private final Set<Integer> voters;
// Used for tracking the expiration of both the Fetch and FetchSnapshot requests
private final Timer fetchTimer;
// Used to throttle update voter request and allow for Fetch/FetchSnapshot requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Used to track when to send another update voter request

(I understand that the follower won't always send an updateVoterRequest when the updateVoterPeriod has expired. Just feel the original comment didn't make much sense to me w/o reading the code)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

private Optional<LogOffsetMetadata> highWatermark;
// For kraft.version 0, track if the leader has recieved updated voter information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: received updated voter information from this follower

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@cmccabe
Copy link
Contributor

cmccabe commented Apr 9, 2025

Before the kraft version can be upgraded to version 1, all of the
brokers and controllers in the cluster need to support kraft version 1.
The check that all brokers support kraft version 1 is done by the
FeatureControlManager. The check that all of the controllers support
kraft version is done by KafkaRaftClient and LeaderState.

There is already code in FeatureControlManager to check that the controllers support the correct features. It relies on the KIP-919 controller registrations. Can we just use that?

The kraft version is upgraded using the UpdateFeatures RPC. These RPCs
are handled by the QuorumController and FeatureControlManager. This
change adds special handling in the FeatureControlManager so that
upgrades to the kraft.version are directed to
RaftClient#upgradeKRaftVersion.

There is a problem here, which is that I don't see how we reverse this change in the case where there is a controller failover and the changes we made were not committed.

We also need some way of waiting for this operation to complete before returning from the updateFeatures call.

A simple way of doing this would be to emit a NoOpRecord and wait for that to complete.

I also would prefer to avoid coupling between FeatureControlManager and RaftClient. It seems like we could just have an interface or something that is implemented by a simple class that calls into RaftClient.

For example if we had something like this:

interface KRaftVersionAccessor {
  KRaftVersion currentKRaftVersion();
  void setKRaftVersion(KRaftVersion);
}

we could avoid the dependency by implementing this in QuorumController in the obvious way...

Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a few more files to review

@@ -87,28 +92,54 @@ public class LeaderState<T> implements EpochState {
// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;

/* Used to coordinate the upgrade of the kraft.version from 0 to 1. The upgrade is triggered by
* the clients to RaftClient.
* 1. if the kraft version is 0, the starting state is Voters. The voter set is the voters in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thrown off a bit by the capitalization - are Voters and Version meant to reference variables in LeaderState that I'm just missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, they are implementations of KRaftVersionUpgrade... I wonder if there is a way to make it more clear what we are referring to since the words "Voters" and "Version" are pretty generic

maybe just - if the kraft version is 0, the starting state is Voters (see KRaftVersionUpgrade for details)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Updated the comment. Hopefully, it is clearer now.

@@ -97,11 +105,16 @@ public FeatureControlManager build() {
MetadataVersion.latestProduction().featureLevel()));
quorumFeatures = new QuorumFeatures(0, localSupportedFeatures, List.of(0));
}
if (raftClient == null) {
throw new IllegalStateException("Must specify and raft client");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "a raft client"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this code.


return quorum
.maybeLeaderState()
.flatMap(LeaderState::requestedKRaftVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it might be possible we report a version that we're never able to finish upgrading to?

Copy link
Member Author

@jsancio jsancio Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We need to do that because the active controller operates on possibly uncommitted state and we want to fence any new registration that doesn't support the new finalized kraft version.

Copy link
Contributor

@ahuang98 ahuang98 Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, would it be too complicated to un-entangle this from the kraftVersion that we return on a describeFeatures admin call?
e.g. have registration logic rely on in-memory and possibly uncommitted kraft version while describeFeatures returns only committed kraft version?

this is the current logic we have in registerBroker

// Populate finalized features map with latest known kraft version for validation.
controllerFeatures.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel());

a naive solution would be to sub in raftClient.pendingKraftVersion().featureLevel()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have this as a follow up: https://issues.apache.org/jira/browse/KAFKA-18865.

There are a few places where I need to fix this.

* 1. if the kraft version is 0, the starting state is Voters. The voter set is the voters in
* the static voter set with the leader updated.
* 2. as the leader receives UpdateRaftVoter requests, it updates the associated Voters. Only
* after all of the voters have been updated will upgrades successfully complete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will an upgrade successfully complete.

* When {@code validateOnly} is true only the validation is perform and the control records are
* not generated.
*
* @param epoch the current epoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is the epoch that the controller was on when the upgrade kraft version request was created?

since validateEpoch refers to epoch() as the current epoch, maybe we should change the wording of the description here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

/**
* Upgrade the kraft version.
*
* This methods upgradeds the kraft version to {@code newVersion}. If the version is already
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: upgrades

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (!inMemoryVoters.voters().voterIds().equals(persistedVoters.voterIds())) {
throw new IllegalStateException(
String.format(
"Unable to update %s due to missing voters %s compared to %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Unable to upgrade version for %s to %s because only voters %s have been updated out of %s"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the message to include the new version. Didn't change the rest of the phrase because it wasn't 100% accurate. This check is a programming error if it fails, hence the IllegalStateException. It is expected that the voter ids in-memory is always equals to the static voter ids.

if (!successful) {
throw new InvalidUpdateVersionException(
String.format(
"Unable to upgrade version for %s to %s due to changing voters",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unable to upgrade version for %s to %s due to a change in the in memory KRaftVersionUpgrade voter state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message is returned to the user. I don't think the user is not going to know what "in memory KRaftVersionUpgrade" means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought "changing voters" might be a bit ambiguous as well, but can't think of any better wording

if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if there is an uncommitted voters record this update is no-op (we wait until there are only committed voters records present before updating the voters?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct and the remote voter will retry because of the error type returned.

interface KRaftVersionAccessor {
KRaftVersion kraftVersion();

void upgradeKRaftVersion(int epoch, KRaftVersion version, boolean validateOnly);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version -> newVersion ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


import java.util.Objects;

public record Pair<T, U>(T first, U second) {
Copy link
Contributor

@cmccabe cmccabe Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a code smell. Can we have a concrete class which explains what each field means, in the return value from maybeSendRequest?

Copy link
Member Author

@jsancio jsancio Apr 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added RequestSentResult.

Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a mandatory laptop restart, not done reviewing test files yet

Comment on lines 27 to 28
* {@code Voters} is used to stored in-memory the latest voter set. {@code Version} is used to
* stored in-memory the upgraded kraft version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: store

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines +116 to +117
final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters;
final Optional<VoterSet> voters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, why not instantiate these both as empty optionals to remove need for lines L119 & L124

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that harder to read and understand. I tend to prefer final variable when there are multiple initial values since the Java compiler can enforce that all possible initializations are considered.

}
} else {
inMemoryVoters = leaderState.volatileVoters();
if (inMemoryVoters.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be unexpected right? wonder what the rationale is for returning REQUEST_TIMED_OUT

and should we log an error here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if the a remove voter send an update voter request before the leader has written the kraft version to the log and applied to the partitionState. The general flow for upgrade is:

  1. Controller thread generates the control records and appends them to the batch accumulator
  2. The kraft thread collects the control records in the batch accumulator, appends them to the log and updates the partitions state.

The kraft.version used here is not updated until (2.). Any update voter request handled between 1. and 2. the leader is asking the remove replica to retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the additional code comment

Comment on lines +257 to +258
* This update voter operation doesn't compare the directory id. This is useful when upgrading
* from a voter set that doesn't support directory id to one that supports directory ids.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't isVoter already take care of this?

         * If the voter node doesn't include the directory id ({@code Optional.empty()}), a replica
         * is the voter as long as the node id matches. The directory id is not checked.

It seems like we might not need a separate updateVoterIgnoringDirectoryId method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The different is a bit subtle. In kraft.version 1 all of the voters in the voter set have a directory id. To update a voter in kraft.version 1 the node id and directory id must match.

In kraft.version 0 none of the voters in the voter set have a directory id but the voter set stored KRaftVersionUpgrade may have the directory id set even though the kraft.version is 0. This is needed so that the leader can discover the directory id of all of the voters before upgrading the kraft.version.

Now lets assume that we have a cluster in kraft.version 0 with the voter id 1, 2 and 3. All of the voter are upgraded to Kafka 4.1 and they send an update voter request with the directory ids d1, d2 and d3 respectively. Lets say that the leader is 1 and the leader updates the in-memory voter set to include all of the directories. Finally, voter 3 crashes and replaces its disk. The new directory id for voter 3 is d3'. For the upgrade to be correct voter 3 needs to send an update voter request and the leader (1) needs to update the voter set to be d3' even though it already has d3 in its in-memory voter set.

updateVoterIgnoringDirectoryId is allowing the leader to set the voter set to the latest directory id even if it changes while in kraft version 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and we allow updating a voter after a disk replacement in this manner only w/ kraft.version 0? In kraft.version 1 would we necessarily have to remove and then add voter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. After the kraft version has been upgraded to 1, if the directory id changes, the user will have to remove the voter with the old id and then add the voter with the new directory id.

This is done to make sure that the partition stays consistent after there is a disk failure.

@@ -2085,8 +2091,8 @@ void testFollowerSendsUpdateVoter() throws Exception {
.withLocalListeners(localListeners)
.build();

// waiting for 3 times the fetch timeout sends an update voter
for (int i = 0; i < 3; i++) {
// waiting for FETCH request until the UpdateRaftVoter request is set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// waiting for 3 times the fetch timeout sends an update voter
for (int i = 0; i < 3; i++) {
// waiting for FETCH request until the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any point to this for loop if NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD is statically set to 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No but that's because I changed the update voter timer from 3 * fetch timeout to just fetch timeout. I figured that I should leave it parametrized incase we later decide to change the update voter timer.

@EnumSource(value = Errors.class, names = {"NONE", "UNSUPPORTED_VERSION"})
void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
ReplicaKey voter1 = replicaKey(local.id() + 1, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a version of this test where the voters start out w/o directory ids?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I added a test for this.

KRaftVersion.KRAFT_VERSION_0
);

assertThrows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, could you add short comments that explain why each is invalid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@ahuang98 ahuang98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jose for the changes!

Copy link
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cmccabe cmccabe merged commit b97a130 into apache:trunk Apr 22, 2025
26 checks passed
@jsancio jsancio deleted the kafka-16538-upgrade-kraft-version branch April 23, 2025 00:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker kraft
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants