Skip to content

KAFKA-13935 Fix static usages of IBP in KRaft mode#12250

Merged
mumrah merged 17 commits intoapache:trunkfrom
mumrah:KAFKA-13935-ibp-broker-fixup
Jun 13, 2022
Merged

KAFKA-13935 Fix static usages of IBP in KRaft mode#12250
mumrah merged 17 commits intoapache:trunkfrom
mumrah:KAFKA-13935-ibp-broker-fixup

Conversation

@mumrah
Copy link
Member

@mumrah mumrah commented Jun 3, 2022

  • Set the minimum supported MetadataVersion to 3.0-IV1
  • Remove MetadataVersion.UNINITIALIZED
  • Relocate RPC version mapping for fetch protocols into MetadataVersion
  • Replace static IBP calls with dynamic calls to MetadataCache

A side effect of removing the UNINITIALIZED metadata version is that the FeatureControlManager and FeatureImage will initialize themselves with the minimum KRaft version (3.0-IV1).

The rationale for setting the minimum version to 3.0-IV1 is so that we can avoid any cases of KRaft mode running with an old log message format (KIP-724 was introduced in 3.0-IV1). As a side-effect of increasing this minimum version, the feature level values decreased by one.

@mumrah mumrah marked this pull request as ready for review June 4, 2022 01:39
@mumrah mumrah requested review from cmccabe and dajac June 4, 2022 01:40
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah : Thanks for the PR. One quick comment below.

else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 2
else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) 1
else 0
val interBrokerProtocolVersion = if (processRoles.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still a few places referencing this static value (Partition, GroupCoordinator, etc). Should we change them to use MetadataVersion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should fix those as well. I believe all the remaining usages are for versions prior to 3.0-IV1, so in KRaft mode we can pass a static version 3.0-IV1 (regardless of the actual MetadataVersion). Setting the IBP statically to the minimum KRaft compatible version (when in KRaft mode) also guards against any incorrect/inadvertent future usages of config.interBrokerProtocolVersion.

Basically, the PR grew pretty large/complex when I was trying to include the remaining conversions. WDYT about completing those in a separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mumrah : Thanks for the explanation. Covering that in a separate PR sounds good to me.

metadataCache.setImage(newImage)

val metadataVersionLogMsg = newImage.features().metadataVersion() match {
case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does broker startup have any guarantees with respect to the metadata version in the log? For example, does the controller require the latest metadata feature record to be consumed before a broker can be unfenced? I'm mainly trying to understand if we might revert to an older version and what the consequences of that would be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker remains fenced until it catches up on the metadata log (up to the high watermark, I believe). The broker components that care about metadata (which is most of them) have some "initialize" or "startup" method called when we first publish metadata on a broker. This should mean that when a broker component is fully started, it will have the current metadata version of the cluster.

If a metadata version upgrade happened while a broker was initializing, I think it would either be included in the initial MetadataImage, or it would be handled separately. Either way, components which use the metadata version need to cope with the version changing at runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I guess I was considering if we need any protection from an uninitialized metadata version in the code. We had the UNINITIALIZED sentinel before, but now it looks like we would use the min KRaft version or IBP. I don't know if there are any consequences to relying on an old version. Writes to the metadata log will go through the controller. Replication is protected by fencing. How about forwarding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IBP is just used for bootstrapping a metadata version when upgrading from an older KRaft cluster (see my other comment). Broker and controller components will assume the minimum KRaft version until we finish loading the metadata log.

Forwarding should be okay since we don't start accepting external connections until after we've caught up on metadata in the broker.

// Enable inbound TCP connections. Each endpoint will be started only once its matching
// authorizer future is completed.
socketServer.enableRequestProcessing(authorizerFutures)

formatParser.addArgument("--release-version", "-r").
action(store()).
help(s"A release version to use for the initial metadata.version. The default is (${MetadataVersion.latest().version()})")
help(s"A release version (e.g., 3.2, 3.3) to use for the initial metadata version. The default is (${MetadataVersion.latest().version()})")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention the minimum version for KRaft? Also, do we enforce the minimum?

Copy link
Member Author

@mumrah mumrah Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataVersion-s have a isKRaftSupported which we check in a few places (including here in StorageTool)

if (!configuredVersion.isKRaftSupported) {
throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}")
} else {
warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3. See kafka-storage.sh help for details.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message here says IBP is deprecated, but we're actually ignoring the value. Should we mention that in the message?

// Load the bootstrap metadata file or, in the case of an upgrade from older KRaft, bootstrap the
// metadata.version corresponding to a user-configured IBP.
val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused on the use of the static IBP. In KafkaConfig, we ignore the value, but here we use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this should be reading the original config for IBP. I'll fix this.

The purpose of this is to allow a rolling upgrade from a previous KRaft version where the IBP is explicitly set. If someone is running 3.2 with IBP of 3.2 (or 3.1, 3.0), we want to keep that same compatibility level during the upgrade to 3.3+. We do this by bootstrapping the IBP supplied by the user.

If IBP is not explicitly set, users won't be able to do a rolling upgrade to 3.3 without side effects. We should probably guard against this to prevent upgraded brokers from falling back to 3.0 while un-upgraded brokers may be on a higher version.

metadataCache.setImage(newImage)

val metadataVersionLogMsg = newImage.features().metadataVersion() match {
case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I guess I was considering if we need any protection from an uninitialized metadata version in the code. We had the UNINITIALIZED sentinel before, but now it looks like we would use the min KRaft version or IBP. I don't know if there are any consequences to relying on an old version. Writes to the metadata log will go through the controller. Replication is protected by fencing. How about forwarding?

}

@Test
def testKRaftUpdateWithIBP(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test case with an invalid IBP version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have an invalid IBP here, KafkaConfig will throw a ConfigException right away. Basically, if we're in KRaft mode and IBP is user-defined as something less than 3.0 (or, 3.0-IV1 really), we won't even try to start up.

I'll see if I can add a test that initializes the controller with a bad metadata version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test case for KafkaConfig would be fine as well.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

@mumrah mumrah merged commit cc38405 into apache:trunk Jun 13, 2022
ableegoldman added a commit to confluentinc/kafka that referenced this pull request Jun 14, 2022
CONFLUENT: Sync from apache/kafka trunk to confluentinc/kafka master (13 Jun 2022)

apache/trunk: (7 commits)
KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN…(apache#12140)
KAFKA-10000: Exactly-once source tasks (apache#11780)
KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (apache#11473)
MINOR: Use Exit.addShutdownHook instead of directly adding hooks to R…(apache#12283)
KAFKA-13846: Adding overloaded metricOrElseCreate method (apache#12121)
KAFKA-13935 Fix static usages of IBP in KRaft mode (apache#12250)
HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (apache#12288)


Conflicts:
None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants