Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Builder(BrokerRegistrationRequestData data) {

@Override
public short oldestAllowedVersion() {
if (data.migratingZkBrokerEpoch() != -1) {
if (data.isMigratingZkBroker()) {
return (short) 1;
} else {
return (short) 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
},
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The rack which this broker is in." },
{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1",
"about": "If the required configurations for ZK migration are present, this value is set to the ZK broker epoch" }
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
"about": "If the required configurations for ZK migration are present, this value is set to true" }
]
}
16 changes: 2 additions & 14 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: Option[String],
val isZkBroker: Boolean,
val zkBrokerEpochSupplier: () => Long
val isZkBroker: Boolean
) extends Logging {

val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
Expand Down Expand Up @@ -291,20 +290,9 @@ class BrokerLifecycleManager(
setMinSupportedVersion(range.min()).
setMaxSupportedVersion(range.max()))
}
val migrationZkBrokerEpoch: Long = {
if (isZkBroker) {
val zkBrokerEpoch: Long = Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1)
if (zkBrokerEpoch < 0) {
throw new IllegalStateException("Trying to sending BrokerRegistration in migration Zk " +
"broker without valid zk broker epoch")
}
zkBrokerEpoch
} else
-1
}
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setMigratingZkBrokerEpoch(migrationZkBrokerEpoch).
setIsMigratingZkBroker(isZkBroker).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
threadNamePrefix,
isZkBroker = false,
() => -1)
isZkBroker = false)

/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2090,8 +2090,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
if (migrationEnabled) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value. " +
s"`${KafkaConfig.ZkConnectProp}` is required because `${KafkaConfig.MigrationEnabledProp} is set to true.")
throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${KafkaConfig.ZkConnectProp}` must also be set.")
}
}
}
Expand All @@ -2115,6 +2114,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
}
}
def validateNonEmptyQuorumVotersForMigration(): Unit = {
if (voterAddressSpecsByNodeId.isEmpty) {
throw new ConfigException(s"If using ${KafkaConfig.MigrationEnabledProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
}
}
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
require(controlPlaneListenerName.isEmpty,
s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.")
Expand Down Expand Up @@ -2197,7 +2201,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
} else {
// ZK-based
if (migrationEnabled) {
validateNonEmptyQuorumVotersForKRaft()
validateNonEmptyQuorumVotersForMigration()
require(controllerListenerNames.nonEmpty,
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,7 @@ class KafkaServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
threadNamePrefix,
isZkBroker = true,
() => kafkaController.brokerEpoch)
isZkBroker = true)

// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
Expand Down Expand Up @@ -812,7 +811,7 @@ class KafkaServer(

_brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN

if (config.migrationEnabled && lifecycleManager != null) {
if (config.migrationEnabled && lifecycleManager != null && metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good fix, can you include it in the change description?

// TODO KAFKA-14447 Only use KRaft controlled shutdown (when in migration mode)
// For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker
// shutting down without waiting for the heartbeat to time out.
Expand All @@ -826,7 +825,6 @@ class KafkaServer(
case e: Throwable =>
error("Got unexpected exception waiting for controlled shutdown future", e)
}
// TODO fix this ^
}

val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.Uuid
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout}

Expand Down Expand Up @@ -85,4 +85,35 @@ class KafkaServerKRaftRegistrationTest {
kraftCluster.close()
}
}

@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = {
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()

// Enable migration configs and restart brokers
val props = kraftCluster.controllerClientProperties()
val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())
} finally {
zkCluster.stop()
kraftCluster.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
manager.close()
}

@Test
def testCreateStartAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
Expand All @@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand All @@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand Down Expand Up @@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BrokerRegistrationRequestTest {
.setBrokerId(brokerId)
.setClusterId(clusterId)
.setIncarnationId(Uuid.randomUuid())
.setMigratingZkBrokerEpoch(zkEpoch.getOrElse(-1L))
.setIsMigratingZkBroker(zkEpoch.isDefined)
.setFeatures(features)

Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
Expand Down
44 changes: 44 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1635,4 +1635,48 @@ class KafkaConfigTest {
errorMessage
)
}

@Test
def testMigrationEnabledZkMode(): Unit = {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
props.setProperty(KafkaConfig.MigrationEnabledProp, "true")
assertEquals(
"If using zookeeper.metadata.migration.enable, controller.quorum.voters must contain a parseable set of voters.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)

props.setProperty(KafkaConfig.QuorumVotersProp, "3000@localhost:9093")
assertEquals(
"requirement failed: controller.listener.names must not be empty when running in ZooKeeper migration mode: []",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)

props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
KafkaConfig.fromProps(props)

props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, MetadataVersion.IBP_3_3_IV0.version())
assertEquals(
"requirement failed: Cannot enable ZooKeeper migration without setting 'inter.broker.protocol.version' to 3.4 or higher",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)

props.remove(KafkaConfig.MigrationEnabledProp)
assertEquals(
"requirement failed: controller.listener.names must be empty when not running in KRaft mode: [CONTROLLER]",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)

props.remove(KafkaConfig.ControllerListenerNamesProp)
KafkaConfig.fromProps(props)
}

@Test
def testMigrationEnabledKRaftMode(): Unit = {
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KafkaConfig.MigrationEnabledProp, "true")

assertEquals(
"If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)

props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
KafkaConfig.fromProps(props)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,13 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
}
}

if (request.migratingZkBrokerEpoch() != -1 && !zkRegistrationAllowed()) {
if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
}

RegisterBrokerRecord record = new RegisterBrokerRecord().
setBrokerId(brokerId).
setMigratingZkBrokerEpoch(request.migratingZkBrokerEpoch()).
setIsMigratingZkBroker(request.isMigratingZkBroker()).
setIncarnationId(request.incarnationId()).
setBrokerEpoch(brokerEpoch).
setRack(request.rack());
Expand Down Expand Up @@ -426,7 +426,7 @@ public void replay(RegisterBrokerRecord record, long offset) {
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown(), BrokerRegistration.zkBrokerEpoch(record.migratingZkBrokerEpoch())));
record.inControlledShutdown(), record.isMigratingZkBroker()));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
Expand Down
Loading