From 8e860e5493e151c73464954fc95411bd4b37c100 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 10 Jan 2023 15:01:30 -0500 Subject: [PATCH] KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record --- .../requests/BrokerRegistrationRequest.java | 2 +- .../message/BrokerRegistrationRequest.json | 4 +- .../kafka/server/BrokerLifecycleManager.scala | 16 +------ .../scala/kafka/server/BrokerServer.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 10 +++-- .../main/scala/kafka/server/KafkaServer.scala | 6 +-- .../KafkaServerKRaftRegistrationTest.scala | 33 +++++++++++++- .../server/BrokerLifecycleManagerTest.scala | 10 ++--- .../BrokerRegistrationRequestTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 44 +++++++++++++++++++ .../controller/ClusterControlManager.java | 6 +-- .../kafka/metadata/BrokerRegistration.java | 34 +++++++------- .../common/metadata/RegisterBrokerRecord.json | 4 +- .../metadata/BrokerRegistrationTest.java | 8 ++-- 14 files changed, 121 insertions(+), 61 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java index 9c6e57c6bb0a6..18d6a070d0557 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java @@ -36,7 +36,7 @@ public Builder(BrokerRegistrationRequestData data) { @Override public short oldestAllowedVersion() { - if (data.migratingZkBrokerEpoch() != -1) { + if (data.isMigratingZkBroker()) { return (short) 1; } else { return (short) 0; diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json index 22b0edbc64c54..98658a3f04ab4 100644 --- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json +++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json @@ -52,7 +52,7 @@ }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The rack which this broker is in." }, - { "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", - "about": "If the required configurations for ZK migration are present, this value is set to the ZK broker epoch" } + { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false", + "about": "If the required configurations for ZK migration are present, this value is set to true" } ] } diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index f560a8eafc3b9..dd3f39b156f30 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -55,8 +55,7 @@ class BrokerLifecycleManager( val config: KafkaConfig, val time: Time, val threadNamePrefix: Option[String], - val isZkBroker: Boolean, - val zkBrokerEpochSupplier: () => Long + val isZkBroker: Boolean ) extends Logging { val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ") @@ -291,20 +290,9 @@ class BrokerLifecycleManager( setMinSupportedVersion(range.min()). setMaxSupportedVersion(range.max())) } - val migrationZkBrokerEpoch: Long = { - if (isZkBroker) { - val zkBrokerEpoch: Long = Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1) - if (zkBrokerEpoch < 0) { - throw new IllegalStateException("Trying to sending BrokerRegistration in migration Zk " + - "broker without valid zk broker epoch") - } - zkBrokerEpoch - } else - -1 - } val data = new BrokerRegistrationRequestData(). setBrokerId(nodeId). - setMigratingZkBrokerEpoch(migrationZkBrokerEpoch). + setIsMigratingZkBroker(isZkBroker). setClusterId(_clusterId). setFeatures(features). setIncarnationId(incarnationId). diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index eb8c58c50bf8b..fd2babd082c84 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -187,8 +187,7 @@ class BrokerServer( lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix, - isZkBroker = false, - () => -1) + isZkBroker = false) /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8097f82fa2cf9..b77846623dc9a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -2090,8 +2090,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } if (migrationEnabled) { if (zkConnect == null) { - throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value. " + - s"`${KafkaConfig.ZkConnectProp}` is required because `${KafkaConfig.MigrationEnabledProp} is set to true.") + throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${KafkaConfig.ZkConnectProp}` must also be set.") } } } @@ -2115,6 +2114,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.") } } + def validateNonEmptyQuorumVotersForMigration(): Unit = { + if (voterAddressSpecsByNodeId.isEmpty) { + throw new ConfigException(s"If using ${KafkaConfig.MigrationEnabledProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.") + } + } def validateControlPlaneListenerEmptyForKRaft(): Unit = { require(controlPlaneListenerName.isEmpty, s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.") @@ -2197,7 +2201,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } else { // ZK-based if (migrationEnabled) { - validateNonEmptyQuorumVotersForKRaft() + validateNonEmptyQuorumVotersForMigration() require(controllerListenerNames.nonEmpty, s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}") require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " + diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 45a9fda1cec11..1defeb2391740 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -378,8 +378,7 @@ class KafkaServer( lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix, - isZkBroker = true, - () => kafkaController.brokerEpoch) + isZkBroker = true) // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId) @@ -812,7 +811,7 @@ class KafkaServer( _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN - if (config.migrationEnabled && lifecycleManager != null) { + if (config.migrationEnabled && lifecycleManager != null && metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) { // TODO KAFKA-14447 Only use KRaft controlled shutdown (when in migration mode) // For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker // shutting down without waiting for the heartbeat to time out. @@ -826,7 +825,6 @@ class KafkaServer( case e: Throwable => error("Got unexpected exception waiting for controlled shutdown future", e) } - // TODO fix this ^ } val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue) diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala index f961cd4507eaa..d6f39c76f3872 100644 --- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala @@ -25,7 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import org.apache.kafka.common.Uuid import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.common.MetadataVersion -import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.Assertions.{assertThrows, fail} import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{Tag, Timeout} @@ -85,4 +85,35 @@ class KafkaServerKRaftRegistrationTest { kraftCluster.close() } } + + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0) + def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = { + // Bootstrap the ZK cluster ID into KRaft + val clusterId = zkCluster.clusterId() + val kraftCluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). + setClusterId(Uuid.fromString(clusterId)). + setNumBrokerNodes(0). + setNumControllerNodes(1).build()) + .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") + .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .build() + try { + kraftCluster.format() + kraftCluster.startup() + + // Enable migration configs and restart brokers + val props = kraftCluster.controllerClientProperties() + val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG) + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") + zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters) + zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") + assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart()) + } finally { + zkCluster.stop() + kraftCluster.close() + } + } } diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index 1a7569a987112..304c987d3aeb6 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest { @Test def testCreateAndClose(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1) + val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) manager.close() } @Test def testCreateStartAndClose(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1) + val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) assertEquals(BrokerState.NOT_RUNNING, manager.state) manager.start(() => context.highestMetadataOffset.get(), context.mockChannelManager, context.clusterId, context.advertisedListeners, @@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest { @Test def testSuccessfulRegistration(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1) + val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) context.controllerNodeProvider.node.set(controllerNode) context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( @@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest { def testRegistrationTimeout(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) val controllerNode = new Node(3000, "localhost", 8021) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1) + val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) context.controllerNodeProvider.node.set(controllerNode) def newDuplicateRegistrationResponse(): Unit = { context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( @@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest { @Test def testControlledShutdown(): Unit = { val context = new BrokerLifecycleManagerTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1) + val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) context.controllerNodeProvider.node.set(controllerNode) context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index d7fbf644c8360..09c74d4e15587 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -110,7 +110,7 @@ class BrokerRegistrationRequestTest { .setBrokerId(brokerId) .setClusterId(clusterId) .setIncarnationId(Uuid.randomUuid()) - .setMigratingZkBrokerEpoch(zkEpoch.getOrElse(-1L)) + .setIsMigratingZkBroker(zkEpoch.isDefined) .setFeatures(features) Errors.forCode(sendAndRecieve(channelManager, req).errorCode()) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 0137ed42eafd4..98f9f2c1fd5d0 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1635,4 +1635,48 @@ class KafkaConfigTest { errorMessage ) } + + @Test + def testMigrationEnabledZkMode(): Unit = { + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + props.setProperty(KafkaConfig.MigrationEnabledProp, "true") + assertEquals( + "If using zookeeper.metadata.migration.enable, controller.quorum.voters must contain a parseable set of voters.", + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage) + + props.setProperty(KafkaConfig.QuorumVotersProp, "3000@localhost:9093") + assertEquals( + "requirement failed: controller.listener.names must not be empty when running in ZooKeeper migration mode: []", + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage) + + props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + KafkaConfig.fromProps(props) + + props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.IBP_3_3_IV0.version()) + assertEquals( + "requirement failed: Cannot enable ZooKeeper migration without setting 'inter.broker.protocol.version' to 3.4 or higher", + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage) + + props.remove(KafkaConfig.MigrationEnabledProp) + assertEquals( + "requirement failed: controller.listener.names must be empty when not running in KRaft mode: [CONTROLLER]", + assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage) + + props.remove(KafkaConfig.ControllerListenerNamesProp) + KafkaConfig.fromProps(props) + } + + @Test + def testMigrationEnabledKRaftMode(): Unit = { + val props = new Properties() + props.putAll(kraftProps()) + props.setProperty(KafkaConfig.MigrationEnabledProp, "true") + + assertEquals( + "If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.", + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage) + + props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + KafkaConfig.fromProps(props) + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 7a58433789b61..98981b4c1f6a9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -336,13 +336,13 @@ public ControllerResult registerBroker( } } - if (request.migratingZkBrokerEpoch() != -1 && !zkRegistrationAllowed()) { + if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) { throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers."); } RegisterBrokerRecord record = new RegisterBrokerRecord(). setBrokerId(brokerId). - setMigratingZkBrokerEpoch(request.migratingZkBrokerEpoch()). + setIsMigratingZkBroker(request.isMigratingZkBroker()). setIncarnationId(request.incarnationId()). setBrokerEpoch(brokerEpoch). setRack(request.rack()); @@ -426,7 +426,7 @@ public void replay(RegisterBrokerRecord record, long offset) { new BrokerRegistration(brokerId, record.brokerEpoch(), record.incarnationId(), listeners, features, Optional.ofNullable(record.rack()), record.fenced(), - record.inControlledShutdown(), BrokerRegistration.zkBrokerEpoch(record.migratingZkBrokerEpoch()))); + record.inControlledShutdown(), record.isMigratingZkBroker())); if (heartbeatManager != null) { if (prevRegistration != null) heartbeatManager.remove(brokerId); heartbeatManager.register(brokerId, record.fenced()); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java index 3f87d2830aba4..0ece4dd94deac 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -65,7 +65,7 @@ public static Optional zkBrokerEpoch(long value) { private final Optional rack; private final boolean fenced; private final boolean inControlledShutdown; - private final Optional migratingZkBrokerEpoch; + private final boolean isMigratingZkBroker; // Visible for testing public BrokerRegistration(int id, @@ -77,7 +77,7 @@ public BrokerRegistration(int id, boolean fenced, boolean inControlledShutdown) { this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack, - fenced, inControlledShutdown, Optional.empty()); + fenced, inControlledShutdown, false); } public BrokerRegistration(int id, @@ -88,9 +88,9 @@ public BrokerRegistration(int id, Optional rack, boolean fenced, boolean inControlledShutdown, - Optional migratingZkBrokerEpoch) { + boolean isMigratingZkBroker) { this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack, - fenced, inControlledShutdown, migratingZkBrokerEpoch); + fenced, inControlledShutdown, isMigratingZkBroker); } // Visible for testing @@ -102,7 +102,7 @@ public BrokerRegistration(int id, Optional rack, boolean fenced, boolean inControlledShutdown) { - this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, Optional.empty()); + this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, false); } public BrokerRegistration(int id, @@ -113,7 +113,7 @@ public BrokerRegistration(int id, Optional rack, boolean fenced, boolean inControlledShutdown, - Optional migratingZkBrokerEpoch) { + boolean isMigratingZkBroker) { this.id = id; this.epoch = epoch; this.incarnationId = incarnationId; @@ -131,7 +131,7 @@ public BrokerRegistration(int id, this.rack = rack; this.fenced = fenced; this.inControlledShutdown = inControlledShutdown; - this.migratingZkBrokerEpoch = migratingZkBrokerEpoch; + this.isMigratingZkBroker = isMigratingZkBroker; } public static BrokerRegistration fromRecord(RegisterBrokerRecord record) { @@ -155,7 +155,7 @@ public static BrokerRegistration fromRecord(RegisterBrokerRecord record) { Optional.ofNullable(record.rack()), record.fenced(), record.inControlledShutdown(), - zkBrokerEpoch(record.migratingZkBrokerEpoch())); + record.isMigratingZkBroker()); } public int id() { @@ -199,11 +199,7 @@ public boolean inControlledShutdown() { } public boolean isMigratingZkBroker() { - return migratingZkBrokerEpoch.isPresent(); - } - - public Optional migratingZkBrokerEpoch() { - return migratingZkBrokerEpoch; + return isMigratingZkBroker; } public ApiMessageAndVersion toRecord(ImageWriterOptions options) { @@ -222,9 +218,9 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions options) { } } - if (migratingZkBrokerEpoch.isPresent()) { + if (isMigratingZkBroker) { if (options.metadataVersion().isMigrationSupported()) { - registrationRecord.setMigratingZkBrokerEpoch(migratingZkBrokerEpoch.get()); + registrationRecord.setIsMigratingZkBroker(isMigratingZkBroker); } else { options.handleLoss("the isMigratingZkBroker state of one or more brokers"); } @@ -253,7 +249,7 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions options) { @Override public int hashCode() { return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures, - rack, fenced, inControlledShutdown, migratingZkBrokerEpoch); + rack, fenced, inControlledShutdown, isMigratingZkBroker); } @Override @@ -268,7 +264,7 @@ public boolean equals(Object o) { other.rack.equals(rack) && other.fenced == fenced && other.inControlledShutdown == inControlledShutdown && - other.migratingZkBrokerEpoch.equals(migratingZkBrokerEpoch); + other.isMigratingZkBroker == isMigratingZkBroker; } @Override @@ -289,7 +285,7 @@ public String toString() { bld.append(", rack=").append(rack); bld.append(", fenced=").append(fenced); bld.append(", inControlledShutdown=").append(inControlledShutdown); - bld.append(", migratingZkBrokerEpoch=").append(migratingZkBrokerEpoch.orElse(-1L)); + bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker); bld.append(")"); return bld.toString(); } @@ -313,7 +309,7 @@ public BrokerRegistration cloneWith( rack, newFenced, newInControlledShutdownChange, - migratingZkBrokerEpoch + isMigratingZkBroker ); } } diff --git a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json index f8de544127c05..66b740573d5aa 100644 --- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json +++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json @@ -22,8 +22,8 @@ "fields": [ { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The broker id." }, - { "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1", - "about": "The ZK broker epoch if this record is for a ZK broker. Otherwise, -1" }, + { "name": "IsMigratingZkBroker", "type": "bool", "versions": "2+", "default": "false", + "about": "True if the broker is a ZK broker in migration mode. Otherwise, false" }, { "name": "IncarnationId", "type": "uuid", "versions": "0+", "about": "The incarnation ID of the broker process" }, { "name": "BrokerEpoch", "type": "int64", "versions": "0+", diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java index 50516fbfccdc3..262c851338162 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java @@ -59,7 +59,7 @@ public class BrokerRegistrationTest { Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)), Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7))) .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)), - Optional.empty(), false, true, Optional.of(10L))); + Optional.empty(), false, true, true)); @Test public void testValues() { @@ -90,19 +90,19 @@ public void testToString() { "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" + "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " + "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " + - "rack=Optional.empty, fenced=true, inControlledShutdown=false, migratingZkBrokerEpoch=-1)", + "rack=Optional.empty, fenced=true, inControlledShutdown=false, isMigratingZkBroker=false)", REGISTRATIONS.get(1).toString()); assertEquals("BrokerRegistration(id=2, epoch=0, " + "incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" + "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " + "host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " + - "rack=Optional[myrack], fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=-1)", + "rack=Optional[myrack], fenced=false, inControlledShutdown=true, isMigratingZkBroker=false)", REGISTRATIONS.get(2).toString()); assertEquals("BrokerRegistration(id=3, epoch=0, " + "incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" + "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " + "host='localhost', port=9093)], supportedFeatures={metadata.version: 7}, " + - "rack=Optional.empty, fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=10)", + "rack=Optional.empty, fenced=false, inControlledShutdown=true, isMigratingZkBroker=true)", REGISTRATIONS.get(3).toString()); }