diff --git a/build.gradle b/build.gradle
index 776bbbcc699fa..fb5756492d1fd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1046,7 +1046,7 @@ project(':core') {
task genTopicConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
- mainClass = 'kafka.log.LogConfig'
+ mainClass = 'org.apache.kafka.server.log.internals.LogConfig'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "topic_config.html").newOutputStream()
}
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 4042cba402fdd..5a92c44d7c87e 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -89,6 +89,7 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 1ab694af3c964..8792c26c94976 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -280,6 +280,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
index 18d6a070d0557..9c6e57c6bb0a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
@@ -36,7 +36,7 @@ public Builder(BrokerRegistrationRequestData data) {
@Override
public short oldestAllowedVersion() {
- if (data.isMigratingZkBroker()) {
+ if (data.migratingZkBrokerEpoch() != -1) {
return (short) 1;
} else {
return (short) 0;
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 19ad8f249b3ae..22b0edbc64c54 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
{
"apiKey":62,
"type": "request",
@@ -51,7 +52,7 @@
},
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The rack which this broker is in." },
- { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
- "about": "Set by a ZK broker if the required configurations for ZK migration are present." }
+ { "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1",
+ "about": "If the required configurations for ZK migration are present, this value is set to the ZK broker epoch" }
]
}
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index 7515d5ee4bc06..7d45b0fecabb8 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
{
"apiKey": 62,
"type": "response",
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index a383bbe6024e3..3da6411dcfe87 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -23,6 +23,7 @@ import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup
+
import kafka.server.FetchDataInfo
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.{KafkaException, TopicPartition}
diff --git a/core/src/main/scala/kafka/migration/MigrationControllerChannelContext.scala b/core/src/main/scala/kafka/migration/MigrationControllerChannelContext.scala
index cb7b9fb4f1642..835f1f60e76ee 100644
--- a/core/src/main/scala/kafka/migration/MigrationControllerChannelContext.scala
+++ b/core/src/main/scala/kafka/migration/MigrationControllerChannelContext.scala
@@ -68,13 +68,13 @@ sealed class MigrationControllerChannelContext(
}
override val liveBrokerIdAndEpochs: collection.Map[Int, Long] = {
- image.cluster().zkBrokers().asScala.map {
+ image.cluster().brokers().asScala.map {
case (brokerId, broker) => brokerId.intValue() -> broker.epoch()
}
}
override val liveOrShuttingDownBrokers: collection.Set[Broker] = {
- image.cluster().zkBrokers().asScala.values.map { registration =>
+ image.cluster().brokers().asScala.values.map { registration =>
Broker.fromBrokerRegistration(registration)
}.toSet
}
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 9c61ffd7aefc3..308d8f06f152f 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -32,10 +32,10 @@ import scala.jdk.CollectionConverters._
class MigrationPropagator(
nodeId: Int,
- config: KafkaConfig,
- metadataVersionProvider: () => MetadataVersion,
+ config: KafkaConfig
) extends LegacyPropagator {
@volatile private var _image = MetadataImage.EMPTY
+ @volatile private var metadataVersion = MetadataVersion.IBP_3_4_IV0
val stateChangeLogger = new StateChangeLogger(nodeId, inControllerContext = true, None)
val channelManager = new ControllerChannelManager(
() => _image.highestOffsetAndEpoch().epoch(),
@@ -48,7 +48,7 @@ class MigrationPropagator(
val requestBatch = new MigrationPropagatorBatch(
config,
metadataProvider,
- metadataVersionProvider,
+ () => metadataVersion,
channelManager,
stateChangeLogger
)
@@ -201,4 +201,8 @@ class MigrationPropagator(
override def clear(): Unit = {
requestBatch.clear()
}
+
+ override def setMetadataVersion(newMetadataVersion: MetadataVersion): Unit = {
+ metadataVersion = newMetadataVersion
+ }
}
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 9f361275d4ef8..f560a8eafc3b9 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -55,7 +55,8 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: Option[String],
- val isZkBroker: Boolean = false
+ val isZkBroker: Boolean,
+ val zkBrokerEpochSupplier: () => Long
) extends Logging {
val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
@@ -271,7 +272,7 @@ class BrokerLifecycleManager(
_advertisedListeners = advertisedListeners.duplicate()
_supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
if (!isZkBroker) {
- // ZK brokers don't block on registration during startup
+ // Only KRaft brokers block on registration during startup
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
@@ -290,9 +291,20 @@ class BrokerLifecycleManager(
setMinSupportedVersion(range.min()).
setMaxSupportedVersion(range.max()))
}
+ val migrationZkBrokerEpoch: Long = {
+ if (isZkBroker) {
+ val zkBrokerEpoch: Long = Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1)
+ if (zkBrokerEpoch < 0) {
+ throw new IllegalStateException("Trying to sending BrokerRegistration in migration Zk " +
+ "broker without valid zk broker epoch")
+ }
+ zkBrokerEpoch
+ } else
+ -1
+ }
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
- setIsMigratingZkBroker(isZkBroker).
+ setMigratingZkBrokerEpoch(migrationZkBrokerEpoch).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 67d27cb052931..dfe29d1ca35ec 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -103,10 +103,16 @@ class RawMetaProperties(val props: Properties = new Properties()) {
object MetaProperties {
def parse(properties: RawMetaProperties): MetaProperties = {
- properties.requireVersion(expectedVersion = 1)
val clusterId = require(ClusterIdKey, properties.clusterId)
- val nodeId = require(NodeIdKey, properties.nodeId)
- new MetaProperties(clusterId, nodeId)
+ if (properties.version == 1) {
+ val nodeId = require(NodeIdKey, properties.nodeId)
+ new MetaProperties(clusterId, nodeId)
+ } else if (properties.version == 0) {
+ val brokerId = require(BrokerIdKey, properties.brokerId)
+ new MetaProperties(clusterId, brokerId)
+ } else {
+ throw new RuntimeException(s"Expected version 0 or 1, but got version ${properties.version}")
+ }
}
def require[T](key: String, value: Option[T]): T = {
@@ -182,7 +188,8 @@ object BrokerMetadataCheckpoint extends Logging {
if (brokerMetadataMap.isEmpty) {
(new RawMetaProperties(), offlineDirs)
} else {
- val numDistinctMetaProperties = brokerMetadataMap.values.toSet.size
+ val parsedProperties = brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props)))
+ val numDistinctMetaProperties = parsedProperties.toSet.size
if (numDistinctMetaProperties > 1) {
val builder = new StringBuilder
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index cbe700358f6f6..eb8c58c50bf8b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -186,7 +186,9 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
- threadNamePrefix)
+ threadNamePrefix,
+ isZkBroker = false,
+ () => -1)
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3ae18bb001b88..c856089af3e97 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -17,11 +17,9 @@
package kafka.server
-import java.util.OptionalLong
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.cluster.Broker.ServerInfo
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
+import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
@@ -29,24 +27,48 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{KafkaZkClient, ZkMigrationClient}
+import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
-import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
+import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
-import scala.jdk.CollectionConverters._
+import java.util.OptionalLong
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
+
+
+case class ControllerMigrationSupport(
+ zkClient: KafkaZkClient,
+ migrationDriver: KRaftMigrationDriver,
+ brokersRpcClient: LegacyPropagator
+) {
+ def shutdown(logging: Logging): Unit = {
+ if (zkClient != null) {
+ CoreUtils.swallow(zkClient.close(), logging)
+ }
+ if (brokersRpcClient != null) {
+ CoreUtils.swallow(brokersRpcClient.shutdown(), logging)
+ }
+ if (migrationDriver != null) {
+ CoreUtils.swallow(migrationDriver.close(), logging)
+ }
+ }
+}
/**
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
@@ -81,6 +103,7 @@ class ControllerServer(
var quotaManagers: QuotaManagers = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
+ var migrationSupport: Option[ControllerMigrationSupport] = None
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
@@ -112,7 +135,6 @@ class ControllerServer(
maybeChangeStatus(STARTING, STARTED)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
-
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
@@ -217,6 +239,26 @@ class ControllerServer(
doRemoteKraftSetup()
}
+ if (config.migrationEnabled) {
+ val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
+ val migrationClient = new ZkMigrationClient(zkClient)
+ val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
+ val migrationDriver = new KRaftMigrationDriver(
+ config.nodeId,
+ controller.asInstanceOf[QuorumController].zkRecordConsumer(),
+ migrationClient,
+ propagator,
+ publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)),
+ sharedServer.faultHandlerFactory.build(
+ "zk migration",
+ fatal = false,
+ () => {}
+ )
+ )
+ migrationDriver.start()
+ migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
+ }
+
quotaManagers = QuotaFactory.instantiate(config,
metrics,
time,
@@ -267,6 +309,7 @@ class ControllerServer(
sharedServer.ensureNotRaftLeader()
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+ migrationSupport.foreach(_.shutdown(this))
if (controller != null)
controller.beginShutdown()
if (socketServer != null)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 662b61a5fd39a..4ec7a7d9a6cf1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -268,7 +268,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- if (isBrokerEpochStale(zkSupport, leaderAndIsrRequest.brokerEpoch)) {
+ if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received LeaderAndIsr request with broker epoch ${leaderAndIsrRequest.brokerEpoch} " +
@@ -289,7 +289,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- if (isBrokerEpochStale(zkSupport, stopReplicaRequest.brokerEpoch)) {
+ if (zkSupport.isBrokerEpochStale(stopReplicaRequest.brokerEpoch, stopReplicaRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received StopReplica request with broker epoch ${stopReplicaRequest.brokerEpoch} " +
@@ -349,7 +349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val updateMetadataRequest = request.body[UpdateMetadataRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
- if (isBrokerEpochStale(zkSupport, updateMetadataRequest.brokerEpoch)) {
+ if (zkSupport.isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received UpdateMetadata request with broker epoch ${updateMetadataRequest.brokerEpoch} " +
@@ -3176,7 +3176,7 @@ class KafkaApis(val requestChannel: RequestChannel,
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
metadataSupport match {
- case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache) =>
+ case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter)
val entriesData = result.iterator.map { case (quotaEntity, quotaValues) =>
@@ -3528,17 +3528,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}
-
- private def isBrokerEpochStale(zkSupport: ZkSupport, brokerEpochInRequest: Long): Boolean = {
- // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown
- // if the controller hasn't been upgraded to use KIP-380
- if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
- else {
- // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
- // about the new broker epoch and sends a control request with this epoch before the broker learns about it
- brokerEpochInRequest < zkSupport.controller.brokerEpoch
- }
- }
}
object KafkaApis {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5903a495a4747..8097f82fa2cf9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2088,6 +2088,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
+ if (migrationEnabled) {
+ if (zkConnect == null) {
+ throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value. " +
+ s"`${KafkaConfig.ZkConnectProp}` is required because `${KafkaConfig.MigrationEnabledProp} is set to true.")
+ }
+ }
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@@ -2193,7 +2199,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
if (migrationEnabled) {
validateNonEmptyQuorumVotersForKRaft()
require(controllerListenerNames.nonEmpty,
- s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZK migration mode: ${controllerListenerNames.asJava}")
+ s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
+ require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
+ s"'${KafkaConfig.InterBrokerProtocolVersionProp}' to 3.4 or higher")
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 11e81fb55ae18..45a9fda1cec11 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -192,6 +192,10 @@ class KafkaServer(
var lifecycleManager: BrokerLifecycleManager = _
+ @volatile var brokerEpochManager: ZkBrokerEpochManager = _
+
+ def brokerEpochSupplier(): Long = Option(brokerEpochManager).map(_.get()).getOrElse(-1)
+
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
@@ -210,11 +214,6 @@ class KafkaServer(
if (canStartup) {
_brokerState = BrokerState.STARTING
- lifecycleManager = new BrokerLifecycleManager(config,
- time,
- threadNamePrefix,
- isZkBroker = true)
-
/* setup zookeeper */
initZkClient(time)
configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
@@ -348,7 +347,7 @@ class KafkaServer(
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
- brokerEpochSupplier = () => kafkaController.brokerEpoch
+ brokerEpochSupplier = brokerEpochSupplier
)
} else {
AlterPartitionManager(kafkaScheduler, time, zkClient)
@@ -366,7 +365,22 @@ class KafkaServer(
val zkMetaProperties = ZkMetaProperties(clusterId, config.brokerId)
checkpointBrokerMetadata(zkMetaProperties)
+ /* start token manager */
+ tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
+ tokenManager.startup()
+
+ /* start kafka controller */
+ _kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
+ kafkaController.startup()
+
if (config.migrationEnabled) {
+ logger.info("Starting up additional components for ZooKeeper migration")
+ lifecycleManager = new BrokerLifecycleManager(config,
+ time,
+ threadNamePrefix,
+ isZkBroker = true,
+ () => kafkaController.brokerEpoch)
+
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
@@ -396,6 +410,7 @@ class KafkaServer(
val listener = new OffsetTrackingListener()
raftManager.register(listener)
+ raftManager.startup()
val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep =>
@@ -419,17 +434,12 @@ class KafkaServer(
networkListeners,
ibpAsFeature
)
-
- raftManager.startup()
+ logger.debug("Start RaftManager")
}
- /* start token manager */
- tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
- tokenManager.startup()
-
- /* start kafka controller */
- _kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
- kafkaController.startup()
+ // Used by ZK brokers during a KRaft migration. When talking to a KRaft controller, we need to use the epoch
+ // from BrokerLifecycleManager rather than ZK (via KafkaController)
+ brokerEpochManager = new ZkBrokerEpochManager(metadataCache, kafkaController, Option(lifecycleManager))
adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
@@ -442,7 +452,7 @@ class KafkaServer(
val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
ProducerIdManager.rpc(
config.brokerId,
- brokerEpochSupplier = () => kafkaController.brokerEpoch,
+ brokerEpochSupplier = brokerEpochSupplier,
clientToControllerChannelManager,
config.requestTimeoutMs
)
@@ -490,7 +500,7 @@ class KafkaServer(
remoteLogManager.foreach(_.startup())
/* start processing requests */
- val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
+ val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache, brokerEpochManager)
def createKafkaApis(requestChannel: RequestChannel): KafkaApis = new KafkaApis(
requestChannel = requestChannel,
@@ -655,7 +665,7 @@ class KafkaServer(
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
info("ZkBroker currently has a KRaft controller. Controlled shutdown will be handled " +
"through broker life cycle manager")
- return false
+ return true
}
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
@@ -808,6 +818,15 @@ class KafkaServer(
// shutting down without waiting for the heartbeat to time out.
info("Notifying KRaft of controlled shutdown")
lifecycleManager.beginControlledShutdown()
+ try {
+ lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
+ } catch {
+ case _: TimeoutException =>
+ error("Timed out waiting for the controller to approve controlled shutdown")
+ case e: Throwable =>
+ error("Got unexpected exception waiting for controlled shutdown future", e)
+ }
+ // TODO fix this ^
}
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala
index 41ae2a620ffdc..335df7c42d74c 100644
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ b/core/src/main/scala/kafka/server/MetadataSupport.scala
@@ -75,8 +75,10 @@ case class ZkSupport(adminManager: ZkAdminManager,
controller: KafkaController,
zkClient: KafkaZkClient,
forwardingManager: Option[ForwardingManager],
- metadataCache: ZkMetadataCache) extends MetadataSupport {
+ metadataCache: ZkMetadataCache,
+ brokerEpochManager: ZkBrokerEpochManager) extends MetadataSupport {
override def requireZkOrThrow(createException: => Exception): ZkSupport = this
+
override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException
override def ensureConsistentWith(config: KafkaConfig): Unit = {
@@ -86,9 +88,14 @@ case class ZkSupport(adminManager: ZkAdminManager,
}
override def canForward(): Boolean = forwardingManager.isDefined && (!controller.isActive)
+
+ def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: Boolean): Boolean = {
+ brokerEpochManager.isBrokerEpochStale(brokerEpochInRequest, isKRaftControllerRequest)
+ }
}
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: KRaftMetadataCache)
+case class RaftSupport(fwdMgr: ForwardingManager,
+ metadataCache: KRaftMetadataCache)
extends MetadataSupport {
override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
diff --git a/core/src/main/scala/kafka/server/ZkBrokerEpochManager.scala b/core/src/main/scala/kafka/server/ZkBrokerEpochManager.scala
new file mode 100644
index 0000000000000..36b2815444576
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ZkBrokerEpochManager.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import org.apache.kafka.common.requests.AbstractControlRequest
+
+class ZkBrokerEpochManager(metadataCache: MetadataCache,
+ controller: KafkaController,
+ lifecycleManagerOpt: Option[BrokerLifecycleManager]) {
+ def get(): Long = {
+ lifecycleManagerOpt match {
+ case Some(lifecycleManager) => metadataCache.getControllerId match {
+ case Some(_: ZkCachedControllerId) => controller.brokerEpoch
+ case Some(_: KRaftCachedControllerId) => lifecycleManager.brokerEpoch
+ case None => controller.brokerEpoch
+ }
+ case None => controller.brokerEpoch
+ }
+ }
+
+ def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: Boolean): Boolean = {
+ if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) {
+ false
+ } else if (isKRaftControllerRequest) {
+ if (lifecycleManagerOpt.isDefined) {
+ brokerEpochInRequest < lifecycleManagerOpt.get.brokerEpoch
+ } else {
+ throw new IllegalStateException("Expected BrokerLifecycleManager to be non-null.")
+ }
+ } else {
+ // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
+ // about the new broker epoch and sends a control request with this epoch before the broker learns about it
+ brokerEpochInRequest < controller.brokerEpoch
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index e2faccb1482f0..c4d075211a123 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1952,7 +1952,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def wrapMigrationRequest(request: Req, lastRequestInBatch: Boolean): MultiRequest = {
// Wrap a single request with the multi-op transactional request.
- val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.controllerZkVersion())
+ val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.zkControllerEpochZkVersion())
val migrationOp = if (lastRequestInBatch) {
SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
} else {
@@ -2037,7 +2037,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
- migrationState.controllerZkVersion() match {
+ migrationState.zkControllerEpochZkVersion() match {
case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(
s"Expected a controller epoch zkVersion when making migration writes, not -1.")
case version if version >= 0 =>
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index b0337b90062de..cc48c46ef72cb 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -1068,7 +1068,8 @@ object MigrationZNode {
metadataEpoch,
modifyTimeMs,
zkVersion,
- ZkVersion.UnknownVersion))
+ ZkMigrationLeadershipState.EMPTY.zkControllerEpoch(),
+ ZkMigrationLeadershipState.EMPTY.zkControllerEpochZkVersion()))
}.getOrElse(throw new KafkaException(s"Failed to parse the migration json $jsonDataAsString"))
}
}
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 017f773ee21c8..1ab05d5a75dc6 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
@@ -40,7 +41,10 @@ import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
-
+/**
+ * Migration client in KRaft controller responsible for handling communication to Zookeeper and
+ * the ZkBrokers present in the cluster.
+ */
class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Logging {
override def getOrCreateMigrationRecoveryState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
@@ -54,19 +58,20 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
override def claimControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
zkClient.tryRegisterKRaftControllerAsActiveController(state.kraftControllerId(), state.kraftControllerEpoch()) match {
- case SuccessfulRegistrationResult(_, controllerEpochZkVersion) => state.withControllerZkVersion(controllerEpochZkVersion)
- case FailedRegistrationResult() => state.withControllerZkVersion(-1)
+ case SuccessfulRegistrationResult(controllerEpoch, controllerEpochZkVersion) =>
+ state.withZkController(controllerEpoch, controllerEpochZkVersion)
+ case FailedRegistrationResult() => state.withUnknownZkController()
}
}
override def releaseControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
try {
- zkClient.deleteController(state.controllerZkVersion())
- state.withControllerZkVersion(-1)
+ zkClient.deleteController(state.zkControllerEpochZkVersion())
+ state.withUnknownZkController()
} catch {
case _: ControllerMovedException =>
// If the controller moved, no need to release
- state.withControllerZkVersion(-1)
+ state.withUnknownZkController()
case t: Throwable =>
throw new RuntimeException("Could not release controller leadership due to underlying error", t)
}
@@ -267,8 +272,18 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
}
val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
- val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
- state.withMigrationZkVersion(migrationZkVersion)
+ val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+ val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
+ if (resultCodes(TopicZNode.path(topicName)).equals(Code.NODEEXISTS)) {
+ // topic already created, just return
+ state
+ } else if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
+ // ok
+ state.withMigrationZkVersion(migrationZkVersion)
+ } else {
+ // not ok
+ throw new RuntimeException(s"Failed to create or update topic $topicName. ZK operation had results $resultCodes")
+ }
}
private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
@@ -314,8 +329,13 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
if (requests.isEmpty) {
state
} else {
- val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
- state.withMigrationZkVersion(migrationZkVersion)
+ val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
+ val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
+ if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
+ state.withMigrationZkVersion(migrationZkVersion)
+ } else {
+ throw new RuntimeException(s"Failed to update partition states: $topicPartitions. ZK transaction had results $resultCodes")
+ }
}
}
@@ -435,4 +455,10 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
state
}
}
+
+ override def writeMetadataDeltaToZookeeper(delta: MetadataDelta,
+ image: MetadataImage,
+ state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+ state
+ }
}
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index f7eb0a501798e..e5702cf95fcf8 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -25,6 +25,7 @@
import kafka.test.ClusterInstance;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
+import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
@@ -66,11 +67,13 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
private final AtomicReference clusterReference;
+ private final AtomicReference zkReference;
private final boolean isCoResident;
public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) {
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
+ this.zkReference = new AtomicReference<>();
this.isCoResident = isCoResident;
}
@@ -84,7 +87,7 @@ public String getDisplayName(int invocationIndex) {
@Override
public List getAdditionalExtensions() {
- RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, clusterConfig);
+ RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
@@ -98,6 +101,11 @@ public List getAdditionalExtensions() {
});
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
+ if (Boolean.parseBoolean(clusterConfig.serverProperties().getProperty("zookeeper.metadata.migration.enable", "false"))) {
+ zkReference.set(new EmbeddedZookeeper());
+ builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port()));
+ }
+
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
// KAFKA-12512 need to pass security protocol and listener name here
@@ -120,13 +128,15 @@ public List getAdditionalExtensions() {
public static class RaftClusterInstance implements ClusterInstance {
private final AtomicReference clusterReference;
+ private final AtomicReference zkReference;
private final ClusterConfig clusterConfig;
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
private final ConcurrentLinkedQueue admins = new ConcurrentLinkedQueue<>();
- RaftClusterInstance(AtomicReference clusterReference, ClusterConfig clusterConfig) {
+ RaftClusterInstance(AtomicReference clusterReference, AtomicReference zkReference, ClusterConfig clusterConfig) {
this.clusterReference = clusterReference;
+ this.zkReference = zkReference;
this.clusterConfig = clusterConfig;
}
@@ -247,6 +257,9 @@ public void stop() {
if (stopped.compareAndSet(false, true)) {
admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));
Utils.closeQuietly(clusterReference.get(), "cluster");
+ if (zkReference.get() != null) {
+ Utils.closeQuietly(zkReference.get(), "zk");
+ }
}
}
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 0d9375dbd7c2c..5154982e47219 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -339,6 +339,5 @@ private KafkaServer findBrokerOrThrow(int brokerId) {
private Stream servers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
}
-
}
}
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 9737a0ea0596c..f961cd4507eaa 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.Uuid
import org.apache.kafka.raft.RaftConfig
@@ -55,6 +56,7 @@ class KafkaServerKRaftRegistrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+ .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -79,6 +81,7 @@ class KafkaServerKRaftRegistrationTest {
case t: Throwable => fail("Had some other error waiting for brokers", t)
}
} finally {
+ zkCluster.stop()
kraftCluster.close()
}
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index da6d9a8aa80cf..1a7569a987112 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -59,7 +59,7 @@ class BrokerLifecycleManagerTest {
def listenerName: ListenerName = new ListenerName("PLAINTEXT")
- def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
+ def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
@@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
- val manager = new BrokerLifecycleManager(context.config, context.time, None)
+ val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
manager.close()
}
@Test
def testCreateStartAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
- val manager = new BrokerLifecycleManager(context.config, context.time, None)
+ val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
@@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
- val manager = new BrokerLifecycleManager(context.config, context.time, None)
+ val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
- val manager = new BrokerLifecycleManager(context.config, context.time, None)
+ val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
- val manager = new BrokerLifecycleManager(context.config, context.time, None)
+ val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index cab586f23e40c..d7fbf644c8360 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -92,7 +92,7 @@ class BrokerRegistrationRequestTest {
channelManager: BrokerToControllerChannelManager,
clusterId: String,
brokerId: Int,
- zk: Boolean,
+ zkEpoch: Option[Long],
ibpToSend: Option[(MetadataVersion, MetadataVersion)]
): Errors = {
val features = new BrokerRegistrationRequestData.FeatureCollection()
@@ -110,7 +110,7 @@ class BrokerRegistrationRequestTest {
.setBrokerId(brokerId)
.setClusterId(clusterId)
.setIncarnationId(Uuid.randomUuid())
- .setIsMigratingZkBroker(zk)
+ .setMigratingZkBrokerEpoch(zkEpoch.getOrElse(-1L))
.setFeatures(features)
Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
@@ -126,19 +126,19 @@ class BrokerRegistrationRequestTest {
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
+ registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, true, None))
+ registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+ registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.NONE,
- registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+ registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}
@@ -154,19 +154,19 @@ class BrokerRegistrationRequestTest {
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
+ registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, true, None))
+ registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+ registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.NONE,
- registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_4_IV0))))
+ registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}
@@ -182,19 +182,19 @@ class BrokerRegistrationRequestTest {
assertEquals(
Errors.NONE,
- registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+ registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.UNSUPPORTED_VERSION,
- registerBroker(channelManager, clusterId, 100, true, None))
+ registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.UNSUPPORTED_VERSION,
- registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_3_IV3))))
+ registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_3_IV3))))
assertEquals(
Errors.NONE,
- registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+ registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 4b039b5543f7e..f80a4abf35f8d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -116,6 +116,7 @@ class KafkaApisTest {
private val brokerId = 1
// KRaft tests should override this with a KRaftMetadataCache
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest())
+ private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
@@ -171,7 +172,7 @@ class KafkaApisTest {
} else {
metadataCache match {
case zkMetadataCache: ZkMetadataCache =>
- ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache)
+ ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache, brokerEpochManager)
case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
}
}
@@ -185,8 +186,8 @@ class KafkaApisTest {
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures())
new KafkaApis(
- metadataSupport = metadataSupport,
requestChannel = requestChannel,
+ metadataSupport = metadataSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = newGroupCoordinator,
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 28b592eaf7af8..a1424a269cecd 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -20,10 +20,12 @@ package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.{CoreUtils, Logging, TestUtils}
-import java.net.InetSocketAddress
+import java.net.InetSocketAddress
import org.apache.kafka.common.utils.Utils
+import java.io.Closeable
+
/**
* ZooKeeperServer wrapper that starts the server with temporary directories during construction and deletes
* the directories when `shutdown()` is called.
@@ -34,7 +36,7 @@ import org.apache.kafka.common.utils.Utils
// This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other
// projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for
// others to use.
-class EmbeddedZookeeper() extends Logging {
+class EmbeddedZookeeper() extends Closeable with Logging {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
@@ -65,5 +67,6 @@ class EmbeddedZookeeper() extends Logging {
Utils.delete(logDir)
Utils.delete(snapshotDir)
}
-
+
+ override def close(): Unit = shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index b60004e6cee72..0e6cf513b36b9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1442,8 +1442,8 @@ class KafkaZkClientTest extends QuorumTestHarness {
@Test
def testFailToUpdateMigrationZNode(): Unit = {
- val (_, stat) = zkClient.getControllerEpoch.get
- var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+ val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+ var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
migrationState = zkClient.getOrCreateMigrationState(migrationState)
assertEquals(0, migrationState.migrationZkVersion())
@@ -1455,7 +1455,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
)
- migrationState = migrationState.withControllerZkVersion(stat.getVersion)
+ migrationState = migrationState.withZkController(controllerEpoch, stat.getVersion)
zkClient.retryMigrationRequestsUntilConnected(requests_bad, migrationState) match {
case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
assertEquals(0, zkVersion)
@@ -1477,7 +1477,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
CreateRequest("/foo/bar/eggs", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
)
- migrationState = migrationState.withControllerZkVersion(stat.getVersion)
+ migrationState = migrationState.withZkController(controllerEpoch, stat.getVersion)
zkClient.retryMigrationRequestsUntilConnected(requests_good, migrationState) match {
case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
assertEquals(1, zkVersion)
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index a8493d027d5b6..b6d017877852f 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -60,8 +60,8 @@ class ZkMigrationClientTest extends QuorumTestHarness {
}
private def initialMigrationState: ZkMigrationLeadershipState = {
- val (_, stat) = zkClient.getControllerEpoch.get
- new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+ val (epoch, stat) = zkClient.getControllerEpoch.get
+ new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, epoch, stat.getVersion)
}
@Test
@@ -143,6 +143,22 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(List(1, 2, 3), partition1.isr)
}
+ @Test
+ def testIdempotentCreateTopics(): Unit = {
+ assertEquals(0, migrationState.migrationZkVersion())
+
+ val partitions = Map(
+ 0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+ 1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+ ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+ val topicId = Uuid.randomUuid()
+ migrationState = migrationClient.createTopic("test", topicId, partitions, migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+
+ migrationState = migrationClient.createTopic("test", topicId, partitions, migrationState)
+ assertEquals(1, migrationState.migrationZkVersion())
+ }
+
// Write Client Quotas using ZkMigrationClient and read them back using AdminZkClient
private def writeClientQuotaAndVerify(migrationClient: ZkMigrationClient,
adminZkClient: AdminZkClient,
@@ -217,20 +233,22 @@ class ZkMigrationClientTest extends QuorumTestHarness {
def testClaimAbsentController(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = migrationClient.claimControllerLeadership(migrationState)
- assertEquals(1, migrationState.controllerZkVersion())
+ assertEquals(1, migrationState.zkControllerEpochZkVersion())
}
@Test
def testExistingKRaftControllerClaim(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = migrationClient.claimControllerLeadership(migrationState)
- assertEquals(1, migrationState.controllerZkVersion())
+ assertEquals(1, migrationState.zkControllerEpochZkVersion())
// We don't require a KRaft controller to release the controller in ZK before another KRaft controller
// can claim it. This is because KRaft leadership comes from Raft and we are just synchronizing it to ZK.
- var otherNodeState = new ZkMigrationLeadershipState(3001, 43, 100, 42, Time.SYSTEM.milliseconds(), -1, -1)
+ var otherNodeState = ZkMigrationLeadershipState.EMPTY
+ .withNewKRaftController(3001, 43)
+ .withKRaftMetadataOffsetAndEpoch(100, 42);
otherNodeState = migrationClient.claimControllerLeadership(otherNodeState)
- assertEquals(2, otherNodeState.controllerZkVersion())
+ assertEquals(2, otherNodeState.zkControllerEpochZkVersion())
assertEquals(3001, otherNodeState.kraftControllerId())
assertEquals(43, otherNodeState.kraftControllerEpoch())
}
@@ -241,7 +259,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch)
migrationState = migrationClient.claimControllerLeadership(migrationState)
- assertEquals(1, migrationState.controllerZkVersion())
+ assertEquals(1, migrationState.zkControllerEpochZkVersion())
migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch - 1)
val t1 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState))
@@ -266,7 +284,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(zkVersion, 1)
migrationState = migrationClient.claimControllerLeadership(migrationState)
- assertEquals(2, migrationState.controllerZkVersion())
+ assertEquals(2, migrationState.zkControllerEpochZkVersion())
zkClient.getControllerEpoch match {
case Some((zkEpoch, stat)) =>
assertEquals(3, zkEpoch)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index f65fdcdaa244e..9b907ef7f9edd 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -23,6 +23,7 @@
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
+import kafka.server.ZkBrokerEpochManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
@@ -180,7 +181,8 @@ private KafkaApis createKafkaApis() {
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().
setRequestChannel(requestChannel).
- setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache)).
+ setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient,
+ Option.empty(), metadataCache, new ZkBrokerEpochManager(metadataCache, kafkaController, Option.empty()))).
setReplicaManager(replicaManager).
setGroupCoordinator(groupCoordinator).
setTxnCoordinator(transactionCoordinator).
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 966e5c2137aa4..7a58433789b61 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -336,13 +336,13 @@ public ControllerResult registerBroker(
}
}
- if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
+ if (request.migratingZkBrokerEpoch() != -1 && !zkRegistrationAllowed()) {
throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
}
RegisterBrokerRecord record = new RegisterBrokerRecord().
setBrokerId(brokerId).
- setIsMigratingZkBroker(request.isMigratingZkBroker()).
+ setMigratingZkBrokerEpoch(request.migratingZkBrokerEpoch()).
setIncarnationId(request.incarnationId()).
setBrokerEpoch(brokerEpoch).
setRack(request.rack());
@@ -420,12 +420,13 @@ public void replay(RegisterBrokerRecord record, long offset) {
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
+
// Update broker registrations.
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced(),
- record.inControlledShutdown(), record.isMigratingZkBroker()));
+ record.inControlledShutdown(), BrokerRegistration.zkBrokerEpoch(record.migratingZkBrokerEpoch())));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
index d034f42cdd4e5..60c48312038ad 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
@@ -145,6 +145,7 @@ void replay(ApiMessage message) {
case ACCESS_CONTROL_ENTRY_RECORD:
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
case NO_OP_RECORD:
+ case ZK_MIGRATION_STATE_RECORD:
// These record types do not affect metrics
break;
default:
diff --git a/metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java
new file mode 100644
index 0000000000000..7f477785129c1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineObject;
+
+public class MigrationControlManager {
+ private final TimelineObject zkMigrationState;
+
+ MigrationControlManager(SnapshotRegistry snapshotRegistry) {
+ zkMigrationState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE);
+ }
+
+ ZkMigrationState zkMigrationState() {
+ return zkMigrationState.get();
+ }
+
+ void replay(ZkMigrationStateRecord record) {
+ zkMigrationState.set(ZkMigrationState.of(record.zkMigrationState()));
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 97e282994716e..bf2f20e91249f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -50,8 +50,8 @@
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
-import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
@@ -65,6 +65,7 @@
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -78,6 +79,8 @@
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
@@ -566,6 +569,10 @@ ConfigurationControlManager configurationControl() {
return configurationControl;
}
+ public ZkRecordConsumer zkRecordConsumer() {
+ return zkRecordConsumer;
+ }
+
CompletableFuture appendReadEvent(
String name,
OptionalLong deadlineNs,
@@ -819,6 +826,62 @@ CompletableFuture appendWriteEvent(String name,
return event.future();
}
+ class MigrationRecordConsumer implements ZkRecordConsumer {
+ private volatile OffsetAndEpoch highestMigrationRecordOffset;
+
+ class MigrationWriteOperation implements ControllerWriteOperation {
+ private final List batch;
+
+ MigrationWriteOperation(List batch) {
+ this.batch = batch;
+ }
+ @Override
+ public ControllerResult generateRecordsAndResult() {
+ return ControllerResult.atomicOf(batch, null);
+ }
+
+ public void processBatchEndOffset(long offset) {
+ highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch);
+ }
+ }
+ @Override
+ public void beginMigration() {
+ // TODO use KIP-868 transaction
+ ControllerWriteEvent event = new ControllerWriteEvent<>("Begin ZK Migration",
+ new MigrationWriteOperation(
+ Collections.singletonList(
+ new ApiMessageAndVersion(
+ new ZkMigrationStateRecord().setZkMigrationState(ZkMigrationState.PRE_MIGRATION.value()),
+ ZkMigrationStateRecord.LOWEST_SUPPORTED_VERSION)
+ )));
+ queue.append(event);
+ }
+
+ @Override
+ public CompletableFuture> acceptBatch(List recordBatch) {
+ if (queue.size() > 100) { // TODO configure this
+ CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large"));
+ return future;
+ }
+ ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch",
+ new MigrationWriteOperation(recordBatch));
+ queue.append(batchEvent);
+ return batchEvent.future;
+ }
+
+ @Override
+ public OffsetAndEpoch completeMigration() {
+ // TODO write migration record, use KIP-868 transaction
+ return highestMigrationRecordOffset;
+ }
+
+ @Override
+ public void abortMigration() {
+ // TODO use KIP-868 transaction
+ }
+ }
+
class QuorumMetaLogListener implements RaftClient.Listener {
@Override
public void handleCommit(BatchReader reader) {
@@ -1349,6 +1412,9 @@ private void replay(ApiMessage message, Optional snapshotId, lon
case NO_OP_RECORD:
// NoOpRecord is an empty record and doesn't need to be replayed
break;
+ case ZK_MIGRATION_STATE_RECORD:
+ // TODO handle this
+ break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
@@ -1571,6 +1637,8 @@ private enum ImbalanceSchedule {
*/
private final BootstrapMetadata bootstrapMetadata;
+ private final ZkRecordConsumer zkRecordConsumer;
+
/**
* The maximum number of records per batch to allow.
*/
@@ -1672,6 +1740,7 @@ private QuorumController(
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.needToCompleteAuthorizerLoad = authorizer.isPresent();
+ this.zkRecordConsumer = new MigrationRecordConsumer();
updateWriteOffset(-1);
resetToEmptyState();
@@ -2029,11 +2098,6 @@ public int curClaimEpoch() {
return curClaimEpoch;
}
- // Visible for testing
- MetadataVersion metadataVersion() {
- return featureControl.metadataVersion();
- }
-
@Override
public void close() throws InterruptedException {
queue.close();
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index fd50b1ba3e202..523cdf9c43db5 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -64,6 +64,7 @@ public Set liveZkBrokerIdChanges() {
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
+ .filter(registration -> registration.isMigratingZkBroker() && !registration.fenced())
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index b39a88725f081..951cea2ea15e9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -34,9 +34,15 @@ public final class ClusterImage {
public static final ClusterImage EMPTY = new ClusterImage(Collections.emptyMap());
private final Map brokers;
+ private final Map zkBrokers;
public ClusterImage(Map brokers) {
this.brokers = Collections.unmodifiableMap(brokers);
+ this.zkBrokers = Collections.unmodifiableMap(brokers
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isMigratingZkBroker())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
public boolean isEmpty() {
@@ -52,7 +58,7 @@ public Map zkBrokers() {
brokers
.entrySet()
.stream()
- .filter(x -> x.getValue().isMigratingZkBroker())
+ .filter(x -> x.getValue().isMigratingZkBroker() && !x.getValue().fenced())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 3d5ee9821ae01..ab4fd68f41a85 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -206,6 +206,9 @@ public void replay(ApiMessage record) {
* updating the highest offset and epoch.
*/
break;
+ case ZK_MIGRATION_STATE_RECORD:
+ // TODO handle this
+ break;
default:
throw new RuntimeException("Unknown metadata record type " + type);
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java b/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
index c3ab72b3b79f2..982a1f8e27180 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.LeaderAndEpoch;
import java.util.Objects;
@@ -31,6 +32,11 @@ public class LogDeltaManifest {
*/
private final MetadataProvenance provenance;
+ /**
+ * The current leader and epoch at the end of this delta.
+ */
+ private final LeaderAndEpoch leaderAndEpoch;
+
/**
* The number of batches that were loaded.
*/
@@ -48,11 +54,13 @@ public class LogDeltaManifest {
public LogDeltaManifest(
MetadataProvenance provenance,
+ LeaderAndEpoch leaderAndEpoch,
int numBatches,
long elapsedNs,
long numBytes
) {
this.provenance = provenance;
+ this.leaderAndEpoch = leaderAndEpoch;
this.numBatches = numBatches;
this.elapsedNs = elapsedNs;
this.numBytes = numBytes;
@@ -63,6 +71,10 @@ public MetadataProvenance provenance() {
return provenance;
}
+ public LeaderAndEpoch leaderAndEpoch() {
+ return leaderAndEpoch;
+ }
+
public int numBatches() {
return numBatches;
}
@@ -79,6 +91,7 @@ public long numBytes() {
public int hashCode() {
return Objects.hash(
provenance,
+ leaderAndEpoch,
numBatches,
elapsedNs,
numBytes);
@@ -89,6 +102,7 @@ public boolean equals(Object o) {
if (o == null || !o.getClass().equals(this.getClass())) return false;
LogDeltaManifest other = (LogDeltaManifest) o;
return provenance.equals(other.provenance) &&
+ leaderAndEpoch == other.leaderAndEpoch &&
numBatches == other.numBatches &&
elapsedNs == other.elapsedNs &&
numBytes == other.numBytes;
@@ -98,6 +112,7 @@ public boolean equals(Object o) {
public String toString() {
return "LogDeltaManifest(" +
"provenance=" + provenance +
+ ", leaderAndEpoch=" + leaderAndEpoch +
", numBatches=" + numBatches +
", elapsedNs=" + elapsedNs +
", numBytes=" + numBytes +
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 9bfe5a5884f8e..7fe1930c8797b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -359,6 +359,7 @@ LogDeltaManifest loadLogDelta(
long elapsedNs = time.nanoseconds() - startNs;
metrics.updateBatchProcessingTime(elapsedNs);
return new LogDeltaManifest(provenance,
+ currentLeaderAndEpoch,
numBatches,
elapsedNs,
numBytes);
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 6aa40195070a6..3f87d2830aba4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -49,6 +49,14 @@ private static Map listenersToMap(Collection listene
return listenersMap;
}
+ public static Optional zkBrokerEpoch(long value) {
+ if (value == -1) {
+ return Optional.empty();
+ } else {
+ return Optional.of(value);
+ }
+ }
+
private final int id;
private final long epoch;
private final Uuid incarnationId;
@@ -57,7 +65,7 @@ private static Map listenersToMap(Collection listene
private final Optional rack;
private final boolean fenced;
private final boolean inControlledShutdown;
- private final boolean isMigratingZkBroker;
+ private final Optional migratingZkBrokerEpoch;
// Visible for testing
public BrokerRegistration(int id,
@@ -69,7 +77,7 @@ public BrokerRegistration(int id,
boolean fenced,
boolean inControlledShutdown) {
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
- fenced, inControlledShutdown, false);
+ fenced, inControlledShutdown, Optional.empty());
}
public BrokerRegistration(int id,
@@ -80,9 +88,9 @@ public BrokerRegistration(int id,
Optional rack,
boolean fenced,
boolean inControlledShutdown,
- boolean isMigratingZkBroker) {
+ Optional migratingZkBrokerEpoch) {
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
- fenced, inControlledShutdown, isMigratingZkBroker);
+ fenced, inControlledShutdown, migratingZkBrokerEpoch);
}
// Visible for testing
@@ -94,7 +102,7 @@ public BrokerRegistration(int id,
Optional rack,
boolean fenced,
boolean inControlledShutdown) {
- this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, false);
+ this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, Optional.empty());
}
public BrokerRegistration(int id,
@@ -105,7 +113,7 @@ public BrokerRegistration(int id,
Optional rack,
boolean fenced,
boolean inControlledShutdown,
- boolean isMigratingZkBroker) {
+ Optional migratingZkBrokerEpoch) {
this.id = id;
this.epoch = epoch;
this.incarnationId = incarnationId;
@@ -123,7 +131,7 @@ public BrokerRegistration(int id,
this.rack = rack;
this.fenced = fenced;
this.inControlledShutdown = inControlledShutdown;
- this.isMigratingZkBroker = isMigratingZkBroker;
+ this.migratingZkBrokerEpoch = migratingZkBrokerEpoch;
}
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -147,7 +155,7 @@ public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
Optional.ofNullable(record.rack()),
record.fenced(),
record.inControlledShutdown(),
- record.isMigratingZkBroker());
+ zkBrokerEpoch(record.migratingZkBrokerEpoch()));
}
public int id() {
@@ -191,7 +199,11 @@ public boolean inControlledShutdown() {
}
public boolean isMigratingZkBroker() {
- return isMigratingZkBroker;
+ return migratingZkBrokerEpoch.isPresent();
+ }
+
+ public Optional migratingZkBrokerEpoch() {
+ return migratingZkBrokerEpoch;
}
public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
@@ -210,9 +222,9 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
}
}
- if (isMigratingZkBroker) {
+ if (migratingZkBrokerEpoch.isPresent()) {
if (options.metadataVersion().isMigrationSupported()) {
- registrationRecord.setIsMigratingZkBroker(isMigratingZkBroker);
+ registrationRecord.setMigratingZkBrokerEpoch(migratingZkBrokerEpoch.get());
} else {
options.handleLoss("the isMigratingZkBroker state of one or more brokers");
}
@@ -241,7 +253,7 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
@Override
public int hashCode() {
return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
- rack, fenced, inControlledShutdown, isMigratingZkBroker);
+ rack, fenced, inControlledShutdown, migratingZkBrokerEpoch);
}
@Override
@@ -256,7 +268,7 @@ public boolean equals(Object o) {
other.rack.equals(rack) &&
other.fenced == fenced &&
other.inControlledShutdown == inControlledShutdown &&
- other.isMigratingZkBroker == isMigratingZkBroker;
+ other.migratingZkBrokerEpoch.equals(migratingZkBrokerEpoch);
}
@Override
@@ -277,7 +289,7 @@ public String toString() {
bld.append(", rack=").append(rack);
bld.append(", fenced=").append(fenced);
bld.append(", inControlledShutdown=").append(inControlledShutdown);
- bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
+ bld.append(", migratingZkBrokerEpoch=").append(migratingZkBrokerEpoch.orElse(-1L));
bld.append(")");
return bld.toString();
}
@@ -301,7 +313,7 @@ public BrokerRegistration cloneWith(
rack,
newFenced,
newInControlledShutdownChange,
- isMigratingZkBroker
+ migratingZkBrokerEpoch
);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
new file mode 100644
index 0000000000000..f3bdeb8150742
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.image.loader.SnapshotManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver implements MetadataPublisher {
+ private final Time time;
+ private final Logger log;
+ private final int nodeId;
+ private final MigrationClient zkMigrationClient;
+ private final LegacyPropagator propagator;
+ private final ZkRecordConsumer zkRecordConsumer;
+ private final KafkaEventQueue eventQueue;
+ private final FaultHandler faultHandler;
+ /**
+ * A callback for when the migration state has been recovered from ZK. This is used to delay the installation of this
+ * MetadataPublisher with MetadataLoader.
+ */
+ private final Consumer initialZkLoadHandler;
+ private volatile LeaderAndEpoch leaderAndEpoch;
+ private volatile MigrationState migrationState;
+ private volatile ZkMigrationLeadershipState migrationLeadershipState;
+ private volatile MetadataImage image;
+
+ public KRaftMigrationDriver(
+ int nodeId,
+ ZkRecordConsumer zkRecordConsumer,
+ MigrationClient zkMigrationClient,
+ LegacyPropagator propagator,
+ Consumer initialZkLoadHandler,
+ FaultHandler faultHandler
+ ) {
+ this.nodeId = nodeId;
+ this.zkRecordConsumer = zkRecordConsumer;
+ this.zkMigrationClient = zkMigrationClient;
+ this.propagator = propagator;
+ this.time = Time.SYSTEM;
+ this.log = LoggerFactory.getLogger(KRaftMigrationDriver.class);
+ this.migrationState = MigrationState.UNINITIALIZED;
+ this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
+ this.eventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext("KRaftMigrationDriver"), "kraft-migration");
+ this.image = MetadataImage.EMPTY;
+ this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
+ this.initialZkLoadHandler = initialZkLoadHandler;
+ this.faultHandler = faultHandler;
+ }
+
+ public void start() {
+ eventQueue.prepend(new PollEvent());
+ }
+
+ public void shutdown() throws InterruptedException {
+ eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
+ log.debug("Shutting down KRaftMigrationDriver");
+ eventQueue.close();
+ }
+
+ private void initializeMigrationState() {
+ log.info("Recovering migration state");
+ apply("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState);
+ String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
+ log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
+ initialZkLoadHandler.accept(this);
+ // Let's transition to INACTIVE state and wait for leadership events.
+ transitionTo(MigrationState.INACTIVE);
+ }
+
+ private boolean isControllerQuorumReadyForMigration() {
+ // TODO implement this
+ return true;
+ }
+
+ private boolean areZkBrokersReadyForMigration() {
+ if (image == MetadataImage.EMPTY) {
+ // TODO maybe add WAIT_FOR_INITIAL_METADATA_PUBLISH state to avoid this kind of check?
+ log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
+ return false;
+ }
+ Set kraftRegisteredZkBrokers = image.cluster().zkBrokers().keySet();
+ Set zkRegisteredZkBrokers = zkMigrationClient.readBrokerIdsFromTopicAssignments();
+ zkRegisteredZkBrokers.removeAll(kraftRegisteredZkBrokers);
+ if (zkRegisteredZkBrokers.isEmpty()) {
+ return true;
+ } else {
+ log.info("Still waiting for ZK brokers {} to register with KRaft.", zkRegisteredZkBrokers);
+ return false;
+ }
+ }
+
+ private void apply(String name, Function stateMutator) {
+ ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
+ ZkMigrationLeadershipState afterState = stateMutator.apply(beforeState);
+ log.trace("{} transitioned from {} to {}", name, beforeState, afterState);
+ this.migrationLeadershipState = afterState;
+ }
+
+ private boolean isValidStateChange(MigrationState newState) {
+ if (migrationState == newState)
+ return true;
+ switch (migrationState) {
+ case UNINITIALIZED:
+ case DUAL_WRITE:
+ return newState == MigrationState.INACTIVE;
+ case INACTIVE:
+ return newState == MigrationState.WAIT_FOR_CONTROLLER_QUORUM;
+ case WAIT_FOR_CONTROLLER_QUORUM:
+ return
+ newState == MigrationState.INACTIVE ||
+ newState == MigrationState.WAIT_FOR_BROKERS;
+ case WAIT_FOR_BROKERS:
+ return
+ newState == MigrationState.INACTIVE ||
+ newState == MigrationState.BECOME_CONTROLLER;
+ case BECOME_CONTROLLER:
+ return
+ newState == MigrationState.INACTIVE ||
+ newState == MigrationState.ZK_MIGRATION ||
+ newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
+ case ZK_MIGRATION:
+ return
+ newState == MigrationState.INACTIVE ||
+ newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
+ case KRAFT_CONTROLLER_TO_BROKER_COMM:
+ return
+ newState == MigrationState.INACTIVE ||
+ newState == MigrationState.DUAL_WRITE;
+ default:
+ log.error("Migration driver trying to transition from an unknown state {}", migrationState);
+ return false;
+ }
+ }
+
+ private void transitionTo(MigrationState newState) {
+ if (!isValidStateChange(newState)) {
+ log.error("Error transition in migration driver from {} to {}", migrationState, newState);
+ return;
+ }
+ if (newState != migrationState) {
+ log.debug("{} transitioning from {} to {} state", nodeId, migrationState, newState);
+ } else {
+ log.trace("{} transitioning from {} to {} state", nodeId, migrationState, newState);
+ }
+ switch (newState) {
+ case UNINITIALIZED:
+ // No state can transition to UNITIALIZED.
+ throw new IllegalStateException("Illegal transition from " + migrationState + " to " + newState + " " +
+ "state in Zk to KRaft migration");
+ case INACTIVE:
+ // Any state can go to INACTIVE.
+ break;
+ }
+ migrationState = newState;
+ }
+
+ @Override
+ public String name() {
+ return "KRaftMigrationDriver";
+ }
+
+ @Override
+ public void publishSnapshot(MetadataDelta delta, MetadataImage newImage, SnapshotManifest manifest) {
+ eventQueue.append(new MetadataChangeEvent(delta, newImage, manifest.provenance(), true));
+ }
+
+ @Override
+ public void publishLogDelta(MetadataDelta delta, MetadataImage newImage, LogDeltaManifest manifest) {
+ if (!leaderAndEpoch.equals(manifest.leaderAndEpoch())) {
+ eventQueue.append(new KRaftLeaderEvent(manifest.leaderAndEpoch()));
+ }
+ eventQueue.append(new MetadataChangeEvent(delta, newImage, manifest.provenance(), false));
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ eventQueue.close();
+ }
+
+ // Events handled by Migration Driver.
+ abstract class MigrationEvent implements EventQueue.Event {
+ @Override
+ public void handleException(Throwable e) {
+ KRaftMigrationDriver.this.faultHandler.handleFault("Error during ZK migration", e);
+ }
+ }
+
+ class PollEvent extends MigrationEvent {
+ @Override
+ public void run() throws Exception {
+ switch (migrationState) {
+ case UNINITIALIZED:
+ initializeMigrationState();
+ break;
+ case INACTIVE:
+ // Nothing to do when the driver is inactive. We need to wait on the
+ // controller node's state to move forward.
+ break;
+ case WAIT_FOR_CONTROLLER_QUORUM:
+ eventQueue.append(new WaitForControllerQuorumEvent());
+ break;
+ case BECOME_CONTROLLER:
+ eventQueue.append(new BecomeZkControllerEvent());
+ break;
+ case WAIT_FOR_BROKERS:
+ eventQueue.append(new WaitForZkBrokersEvent());
+ break;
+ case ZK_MIGRATION:
+ eventQueue.append(new MigrateMetadataEvent());
+ break;
+ case KRAFT_CONTROLLER_TO_BROKER_COMM:
+ eventQueue.append(new SendRPCsToBrokersEvent());
+ break;
+ case DUAL_WRITE:
+ // Nothing to do in the PollEvent. If there's metadata change, we use
+ // MetadataChange event to drive the writes to Zookeeper.
+ break;
+ }
+
+ // Poll again after some time
+ long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
+ eventQueue.scheduleDeferred(
+ "poll",
+ new EventQueue.DeadlineFunction(deadline),
+ new PollEvent());
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+ }
+ }
+
+ class KRaftLeaderEvent extends MigrationEvent {
+ private final LeaderAndEpoch leaderAndEpoch;
+
+ KRaftLeaderEvent(LeaderAndEpoch leaderAndEpoch) {
+ this.leaderAndEpoch = leaderAndEpoch;
+ }
+
+ @Override
+ public void run() throws Exception {
+ // We can either be the active controller or just resigned from being the controller.
+ KRaftMigrationDriver.this.leaderAndEpoch = leaderAndEpoch;
+ boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
+ switch (migrationState) {
+ case UNINITIALIZED:
+ // Poll and retry after initialization
+ long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+ eventQueue.scheduleDeferred(
+ "poll",
+ new EventQueue.DeadlineFunction(deadline),
+ this);
+ break;
+ default:
+ if (!isActive) {
+ apply("KRaftLeaderEvent is not active", state -> ZkMigrationLeadershipState.EMPTY);
+ transitionTo(MigrationState.INACTIVE);
+ } else {
+ // Apply the new KRaft state
+ apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
+ // Before becoming the controller fo ZkBrokers, we need to make sure the
+ // Controller Quorum can handle migration.
+ transitionTo(MigrationState.WAIT_FOR_CONTROLLER_QUORUM);
+ }
+ break;
+ }
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+ }
+ }
+
+ class WaitForControllerQuorumEvent extends MigrationEvent {
+
+ @Override
+ public void run() throws Exception {
+ switch (migrationState) {
+ case WAIT_FOR_CONTROLLER_QUORUM:
+ if (isControllerQuorumReadyForMigration()) {
+ log.debug("Controller Quorum is ready for Zk to KRaft migration");
+ // Note that leadership would not change here. Hence we do not need to
+ // `apply` any leadership state change.
+ transitionTo(MigrationState.WAIT_FOR_BROKERS);
+ }
+ break;
+ default:
+ // Ignore the event as we're not trying to become controller anymore.
+ break;
+ }
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+ }
+ }
+
+ class BecomeZkControllerEvent extends MigrationEvent {
+ @Override
+ public void run() throws Exception {
+ switch (migrationState) {
+ case BECOME_CONTROLLER:
+ // TODO: Handle unhappy path.
+ apply("BecomeZkLeaderEvent", zkMigrationClient::claimControllerLeadership);
+ if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
+ // We could not claim leadership, stay in BECOME_CONTROLLER to retry
+ } else {
+ if (!migrationLeadershipState.zkMigrationComplete()) {
+ transitionTo(MigrationState.ZK_MIGRATION);
+ } else {
+ transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+ }
+ }
+ break;
+ default:
+ // Ignore the event as we're not trying to become controller anymore.
+ break;
+ }
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+ }
+ }
+
+ class WaitForZkBrokersEvent extends MigrationEvent {
+ @Override
+ public void run() throws Exception {
+ switch (migrationState) {
+ case WAIT_FOR_BROKERS:
+ if (areZkBrokersReadyForMigration()) {
+ log.debug("Zk brokers are registered and ready for migration");
+ transitionTo(MigrationState.BECOME_CONTROLLER);
+ }
+ break;
+ default:
+ // Ignore the event as we're not in the appropriate state anymore.
+ break;
+ }
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+ }
+ }
+
+ class MigrateMetadataEvent extends MigrationEvent {
+ @Override
+ public void run() throws Exception {
+ Set brokersInMetadata = new HashSet<>();
+ log.info("Starting ZK migration");
+ zkRecordConsumer.beginMigration();
+ try {
+ AtomicInteger count = new AtomicInteger(0);
+ zkMigrationClient.readAllMetadata(batch -> {
+ try {
+ log.info("Migrating {} records from ZK: {}", batch.size(), batch);
+ CompletableFuture> future = zkRecordConsumer.acceptBatch(batch);
+ count.addAndGet(batch.size());
+ future.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }, brokersInMetadata::add);
+ OffsetAndEpoch offsetAndEpochAfterMigration = zkRecordConsumer.completeMigration();
+ log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were " +
+ "generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
+ "migrated metadata {}.",
+ count.get(),
+ offsetAndEpochAfterMigration.offset(),
+ offsetAndEpochAfterMigration.epoch(),
+ brokersInMetadata.size(),
+ brokersInMetadata);
+ ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
+ offsetAndEpochAfterMigration.offset(),
+ offsetAndEpochAfterMigration.epoch());
+ apply("Migrate metadata from Zk", state -> zkMigrationClient.setMigrationRecoveryState(newState));
+ transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+ } catch (Throwable t) {
+ zkRecordConsumer.abortMigration();
+ // TODO ???
+ }
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+ }
+ }
+
+ class SendRPCsToBrokersEvent extends MigrationEvent {
+
+ @Override
+ public void run() throws Exception {
+ // Ignore sending RPCs to the brokers since we're no longer in the state.
+ if (migrationState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
+ if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
+ propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
+ // Migration leadership state doesn't change since we're not doing any Zk writes.
+ transitionTo(MigrationState.DUAL_WRITE);
+ }
+ }
+ }
+ }
+
+ class MetadataChangeEvent extends MigrationEvent {
+ private final MetadataDelta delta;
+ private final MetadataImage image;
+ private final MetadataProvenance provenance;
+ private final boolean isSnapshot;
+
+ MetadataChangeEvent(MetadataDelta delta, MetadataImage image, MetadataProvenance provenance, boolean isSnapshot) {
+ this.delta = delta;
+ this.image = image;
+ this.provenance = provenance;
+ this.isSnapshot = isSnapshot;
+ }
+
+ @Override
+ public void run() throws Exception {
+ KRaftMigrationDriver.this.image = image;
+
+ if (migrationState != MigrationState.DUAL_WRITE) {
+ log.trace("Received metadata change, but the controller is not in dual-write " +
+ "mode. Ignoring the change to be replicated to Zookeeper");
+ return;
+ }
+ if (delta.featuresDelta() != null) {
+ propagator.setMetadataVersion(image.features().metadataVersion());
+ }
+
+ if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
+ if (delta.topicsDelta() != null) {
+ delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+ if (delta.topicsDelta().createdTopicIds().contains(topicId)) {
+ apply("Create topic " + topicDelta.name(), migrationState ->
+ zkMigrationClient.createTopic(
+ topicDelta.name(),
+ topicId,
+ topicDelta.partitionChanges(),
+ migrationState));
+ } else {
+ apply("Updating topic " + topicDelta.name(), migrationState ->
+ zkMigrationClient.updateTopicPartitions(
+ Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
+ migrationState));
+ }
+ });
+ }
+
+
+ apply("Write MetadataDelta to Zk", state -> zkMigrationClient.writeMetadataDeltaToZookeeper(delta, image, state));
+ // TODO: Unhappy path: Probably relinquish leadership and let new controller
+ // retry the write?
+ propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
+ migrationLeadershipState.zkControllerEpoch());
+ } else {
+ String metadataType = isSnapshot ? "snapshot" : "delta";
+ log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
+ }
+ }
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/LegacyPropagator.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/LegacyPropagator.java
index c2bd4e6f3d0a1..6fe707099a3c9 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/LegacyPropagator.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/LegacyPropagator.java
@@ -18,6 +18,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.MetadataVersion;
public interface LegacyPropagator {
@@ -34,4 +35,6 @@ void sendRPCsToBrokersFromMetadataDelta(MetadataDelta delta,
void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch);
void clear();
+
+ void setMetadataVersion(MetadataVersion metadataVersion);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 5eccbc70625ce..1710458cd57fd 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -17,6 +17,8 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -88,4 +90,15 @@ ZkMigrationLeadershipState updateTopicPartitions(
Set readBrokerIds();
Set readBrokerIdsFromTopicAssignments();
+
+ /**
+ * Convert the Metadata delta to Zookeeper writes and persist the changes. On successful
+ * write, update the migration state with new metadata offset and epoch.
+ * @param delta Changes in the cluster metadata
+ * @param image New metadata after the changes in `delta` are applied
+ * @param state Current migration state before writing to Zookeeper.
+ */
+ ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(MetadataDelta delta,
+ MetadataImage image,
+ ZkMigrationLeadershipState state);
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationState.java
new file mode 100644
index 0000000000000..5a6d3df24a3f9
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationState.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+/**
+ * UNINITIALIZED───────────────►INACTIVE◄────────────────DUAL_WRITE◄────────────────────────┐
+ * │ ▲ │
+ * │ │ │
+ * │ │ │
+ * │ │◄───────────────────────────────────────────────────────┤
+ * │ │ │
+ * ▼ │ │
+ * WAIT_FOR_CONTROLLER_QUORUM───────────┘◄────────────────────ZK_MIGRATION────────────►KRAFT_CONTROLLER_TO_BROKER_COMM
+ * │ ▲ ▲ ▲
+ * │ │ │ │
+ * │ │ │ │
+ * │ │◄────────────────────────┤ │
+ * │ │ │ │
+ * ▼ │ │ │
+ * BECOME_CONTROLLER───────────────────►└────────────────────►WAIT_FOR_BROKERS───────────────────┘
+ */
+public enum MigrationState {
+ UNINITIALIZED(false), // Initial state.
+ INACTIVE(false), // State when not the active controller.
+ WAIT_FOR_CONTROLLER_QUORUM(false), // Ensure all the quorum nodes are ready for migration.
+ WAIT_FOR_BROKERS(false), // Wait for Zk brokers to be ready for migration.
+ BECOME_CONTROLLER(false), // Become controller for the Zk Brokers.
+ ZK_MIGRATION(true), // The cluster has satisfied the migration criteria
+ KRAFT_CONTROLLER_TO_BROKER_COMM(true), // First communication from Controller to send full RPCs to the Zk brokers.
+ DUAL_WRITE(true); // The data has been migrated
+
+ private final boolean isActiveController;
+
+ MigrationState(boolean isActiveController) {
+ this.isActiveController = isActiveController;
+ }
+
+ boolean isActiveController() {
+ return isActiveController;
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
index b6217ee80c2dd..1d27a4e41cdaf 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.metadata.migration;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
import java.util.Objects;
/**
@@ -25,7 +27,9 @@
*/
public class ZkMigrationLeadershipState {
- public static final ZkMigrationLeadershipState EMPTY = new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -1, -1);
+ // Use -2 as sentinel for "unknown version" for ZK versions to avoid sending an actual -1 "any version"
+ // when doing ZK writes
+ public static final ZkMigrationLeadershipState EMPTY = new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, -2);
private final int kraftControllerId;
@@ -39,36 +43,59 @@ public class ZkMigrationLeadershipState {
private final int migrationZkVersion;
- private final int controllerZkVersion;
+ private final int zkControllerEpoch;
+
+ private final int zkControllerEpochZkVersion;
+
public ZkMigrationLeadershipState(int kraftControllerId, int kraftControllerEpoch,
long kraftMetadataOffset, int kraftMetadataEpoch,
- long lastUpdatedTimeMs, int migrationZkVersion, int controllerZkVersion) {
+ long lastUpdatedTimeMs, int migrationZkVersion,
+ int zkControllerEpoch, int zkControllerEpochZkVersion) {
this.kraftControllerId = kraftControllerId;
this.kraftControllerEpoch = kraftControllerEpoch;
this.kraftMetadataOffset = kraftMetadataOffset;
this.kraftMetadataEpoch = kraftMetadataEpoch;
this.lastUpdatedTimeMs = lastUpdatedTimeMs;
this.migrationZkVersion = migrationZkVersion;
- this.controllerZkVersion = controllerZkVersion;
+ this.zkControllerEpoch = zkControllerEpoch;
+ this.zkControllerEpochZkVersion = zkControllerEpochZkVersion;
}
public ZkMigrationLeadershipState withMigrationZkVersion(int zkVersion) {
return new ZkMigrationLeadershipState(
this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.controllerZkVersion);
+ this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
}
- public ZkMigrationLeadershipState withControllerZkVersion(int zkVersion) {
+ public ZkMigrationLeadershipState withZkController(int zkControllerEpoch, int zkControllerEpochZkVersion) {
return new ZkMigrationLeadershipState(
this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkVersion);
+ this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkControllerEpoch, zkControllerEpochZkVersion);
}
+ public ZkMigrationLeadershipState withUnknownZkController() {
+ return withZkController(EMPTY.zkControllerEpoch, EMPTY.zkControllerEpochZkVersion);
+ }
+
+
public ZkMigrationLeadershipState withNewKRaftController(int controllerId, int controllerEpoch) {
return new ZkMigrationLeadershipState(
controllerId, controllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.controllerZkVersion);
+ this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
+ }
+
+ public ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch(long metadataOffset,
+ int metadataEpoch) {
+ return new ZkMigrationLeadershipState(
+ this.kraftControllerId,
+ this.kraftControllerEpoch,
+ metadataOffset,
+ metadataEpoch,
+ this.lastUpdatedTimeMs,
+ this.migrationZkVersion,
+ this.zkControllerEpoch,
+ this.zkControllerEpochZkVersion);
}
public int kraftControllerId() {
@@ -95,14 +122,22 @@ public int migrationZkVersion() {
return migrationZkVersion;
}
- public int controllerZkVersion() {
- return controllerZkVersion;
+ public int zkControllerEpoch() {
+ return zkControllerEpoch;
+ }
+
+ public int zkControllerEpochZkVersion() {
+ return zkControllerEpochZkVersion;
}
public boolean zkMigrationComplete() {
return kraftMetadataOffset > 0;
}
+ public OffsetAndEpoch offsetAndEpoch() {
+ return new OffsetAndEpoch(kraftMetadataOffset, kraftMetadataEpoch);
+ }
+
@Override
public String toString() {
return "ZkMigrationLeadershipState{" +
@@ -112,7 +147,8 @@ public String toString() {
", kraftMetadataEpoch=" + kraftMetadataEpoch +
", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
", migrationZkVersion=" + migrationZkVersion +
- ", controllerZkVersion=" + controllerZkVersion +
+ ", controllerZkEpoch=" + zkControllerEpoch +
+ ", controllerZkVersion=" + zkControllerEpochZkVersion +
'}';
}
@@ -127,7 +163,8 @@ public boolean equals(Object o) {
&& kraftMetadataEpoch == that.kraftMetadataEpoch
&& lastUpdatedTimeMs == that.lastUpdatedTimeMs
&& migrationZkVersion == that.migrationZkVersion
- && controllerZkVersion == that.controllerZkVersion;
+ && zkControllerEpoch == that.zkControllerEpoch
+ && zkControllerEpochZkVersion == that.zkControllerEpochZkVersion;
}
@Override
@@ -139,6 +176,7 @@ public int hashCode() {
kraftMetadataEpoch,
lastUpdatedTimeMs,
migrationZkVersion,
- controllerZkVersion);
+ zkControllerEpoch,
+ zkControllerEpochZkVersion);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java
new file mode 100644
index 0000000000000..4e35b719d14ba
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public interface ZkRecordConsumer {
+ void beginMigration();
+ CompletableFuture> acceptBatch(List recordBatch);
+ OffsetAndEpoch completeMigration();
+ void abortMigration();
+}
diff --git a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index 80657abb5feb4..f8de544127c05 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -22,8 +22,8 @@
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The broker id." },
- { "name": "IsMigratingZkBroker", "type": "bool", "versions": "2+", "default": false,
- "about": "True if the registering broker is a ZK broker." },
+ { "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
+ "about": "The ZK broker epoch if this record is for a ZK broker. Otherwise, -1" },
{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
"about": "The incarnation ID of the broker process" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 585f1dc40a7e1..1aa86738f980b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -28,6 +28,7 @@
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -412,7 +413,7 @@ public void testLoadEmptyBatch() throws Exception {
assertEquals(400L, loader.lastAppliedOffset());
}
assertTrue(publishers.get(0).closed);
- assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), 1,
+ assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
3000000L, 10),
publishers.get(0).latestLogDeltaManifest);
assertEquals(MetadataVersion.IBP_3_3_IV1,
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index 42fb8d92f6eba..47befabcaba2a 100644
--- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -23,6 +23,7 @@
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
@@ -99,14 +100,14 @@ public void testCreateSnapshot() throws Exception {
build()) {
// Publish a log delta batch. This one will not trigger a snapshot yet.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
// Publish a log delta batch. This will trigger a snapshot.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
// Publish a log delta batch. This one will be ignored because there are other images
// queued for writing.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 2000));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 2000));
assertEquals(Collections.emptyList(), emitter.images());
emitter.setReady();
}
@@ -128,7 +129,7 @@ public void testSnapshotsDisabled() throws Exception {
disabledReason.compareAndSet(null, "we are testing disable()");
// No snapshots are generated because snapshots are disabled.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
}
assertEquals(Collections.emptyList(), emitter.images());
faultHandler.maybeRethrowFirstException();
@@ -147,17 +148,17 @@ public void testTimeBasedSnapshots() throws Exception {
build()) {
// This image isn't published yet.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 50));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 50));
assertEquals(Collections.emptyList(), emitter.images());
mockTime.sleep(TimeUnit.MINUTES.toNanos(40));
// Next image is published because of the time delay.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 50));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 50));
TestUtils.waitForCondition(() -> emitter.images().size() == 1, "images.size == 1");
// bytesSinceLastSnapshot was reset to 0 by the previous snapshot,
// so this does not trigger a new snapshot.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 150));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 150));
}
assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
faultHandler.maybeRethrowFirstException();
@@ -173,7 +174,7 @@ public void testEmitterProblem() throws Exception {
build()) {
for (int i = 0; i < 2; i++) {
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
- new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 10000, 50000));
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 10000, 50000));
}
}
assertEquals(Collections.emptyList(), emitter.images());
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index afdb15e7272d3..50516fbfccdc3 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -54,13 +54,19 @@ public class BrokerRegistrationTest {
Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, (short) 3)),
new SimpleEntry<>("bar", VersionRange.of((short) 1, (short) 4))).collect(
Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
- Optional.of("myrack"), false, true));
+ Optional.of("myrack"), false, true),
+ new BrokerRegistration(3, 0, Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw"),
+ Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
+ Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7)))
+ .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
+ Optional.empty(), false, true, Optional.of(10L)));
@Test
public void testValues() {
assertEquals(0, REGISTRATIONS.get(0).id());
assertEquals(1, REGISTRATIONS.get(1).id());
assertEquals(2, REGISTRATIONS.get(2).id());
+ assertEquals(3, REGISTRATIONS.get(3).id());
}
@Test
@@ -69,9 +75,13 @@ public void testEquals() {
assertNotEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(0));
assertNotEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(2));
assertNotEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(0));
+ assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(0));
+ assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(1));
+ assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(2));
assertEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(0));
assertEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(1));
assertEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(2));
+ assertEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(3));
}
@Test
@@ -80,14 +90,20 @@ public void testToString() {
"incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
- "rack=Optional.empty, fenced=true, inControlledShutdown=false, isMigratingZkBroker=false)",
+ "rack=Optional.empty, fenced=true, inControlledShutdown=false, migratingZkBrokerEpoch=-1)",
REGISTRATIONS.get(1).toString());
assertEquals("BrokerRegistration(id=2, epoch=0, " +
"incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " +
- "rack=Optional[myrack], fenced=false, inControlledShutdown=true, isMigratingZkBroker=false)",
+ "rack=Optional[myrack], fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=-1)",
REGISTRATIONS.get(2).toString());
+ assertEquals("BrokerRegistration(id=3, epoch=0, " +
+ "incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
+ "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+ "host='localhost', port=9093)], supportedFeatures={metadata.version: 7}, " +
+ "rack=Optional.empty, fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=10)",
+ REGISTRATIONS.get(3).toString());
}
@Test
@@ -95,6 +111,7 @@ public void testFromRecordAndToRecord() {
testRoundTrip(REGISTRATIONS.get(0));
testRoundTrip(REGISTRATIONS.get(1));
testRoundTrip(REGISTRATIONS.get(2));
+ testRoundTrip(REGISTRATIONS.get(3));
}
private void testRoundTrip(BrokerRegistration registration) {
@@ -117,5 +134,7 @@ public void testToNode() {
REGISTRATIONS.get(1).node("INTERNAL"));
assertEquals(Optional.of(new Node(2, "localhost", 9092, "myrack")),
REGISTRATIONS.get(2).node("INTERNAL"));
+ assertEquals(Optional.of(new Node(3, "localhost", 9093, null)),
+ REGISTRATIONS.get(3).node("INTERNAL"));
}
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptSnapshotException.java b/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptSnapshotException.java
index 20a071ff3999c..7581444c76709 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptSnapshotException.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/internals/CorruptSnapshotException.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.KafkaException;
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index b9ddb457160fd..064793a8c702b 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -203,6 +203,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
remote_kafka=None,
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
+ quorum_info_provider=None
):
"""
:param context: test context
@@ -262,15 +263,19 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
:param KafkaService remote_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
:param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and remote_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
-
+ :param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
"""
self.zk = zk
self.remote_kafka = remote_kafka
self.allow_zk_with_kraft = allow_zk_with_kraft
- self.quorum_info = quorum.ServiceQuorumInfo(self, context)
+ if quorum_info_provider is None:
+ self.quorum_info = quorum.ServiceQuorumInfo.from_test_context(self, context)
+ else:
+ self.quorum_info = quorum_info_provider(self)
self.controller_quorum = None # will define below if necessary
self.remote_controller_quorum = None # will define below if necessary
+ self.configured_for_zk_migration = False
if num_nodes < 1:
raise Exception("Must set a positive number of nodes: %i" % num_nodes)
@@ -427,6 +432,38 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
self.colocated_nodes_started = 0
self.nodes_to_start = self.nodes
+ def reconfigure_zk_for_migration(self, kraft_quorum):
+ self.configured_for_zk_migration = True
+ self.controller_quorum = kraft_quorum
+
+ # Set the migration properties
+ self.server_prop_overrides.extend([
+ ["zookeeper.metadata.migration.enable", "true"],
+ ["controller.quorum.voters", kraft_quorum.controller_quorum_voters],
+ ["controller.listener.names", kraft_quorum.controller_listener_names]
+ ])
+
+ # Add a port mapping for the controller listener.
+ # This is not added to "advertised.listeners" because of configured_for_zk_migration=True
+ self.port_mappings[kraft_quorum.controller_listener_names] = kraft_quorum.port_mappings.get(kraft_quorum.controller_listener_names)
+
+ def reconfigure_zk_as_kraft(self, kraft_quorum):
+ self.configured_for_zk_migration = True
+
+ # Remove the configs we set in reconfigure_zk_for_migration
+ props = []
+ for prop in self.server_prop_overrides:
+ if not prop[0].startswith("controller"):
+ props.append(prop)
+ self.server_prop_overrides.clear()
+ self.server_prop_overrides.extend(props)
+ del self.port_mappings[kraft_quorum.controller_listener_names]
+
+ # Set the quorum info to remote KRaft
+ self.quorum_info = quorum.ServiceQuorumInfo(quorum.remote_kraft, self)
+ self.remote_controller_quorum = kraft_quorum
+ self.controller_quorum = kraft_quorum
+
def num_kraft_controllers(self, num_nodes_broker_role, controller_num_nodes_override):
if controller_num_nodes_override < 0:
raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override)
@@ -567,7 +604,7 @@ def start_minikdc_if_necessary(self, add_principals=""):
def alive(self, node):
return len(self.pids(node)) > 0
- def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60):
+ def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60, **kwargs):
"""
Start the Kafka broker and wait until it registers its ID in ZooKeeper
Startup will be skipped for any nodes in nodes_to_skip. These nodes can be started later via add_broker
@@ -606,7 +643,9 @@ def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60):
if self.remote_controller_quorum:
self.remote_controller_quorum.start()
- Service.start(self)
+
+ Service.start(self, **kwargs)
+
if self.concurrent_start:
# We didn't wait while starting each individual node, so wait for them all now
for node in self.nodes_to_start:
@@ -764,7 +803,9 @@ def start_cmd(self, node):
return cmd
def controller_listener_name_list(self, node):
- if self.quorum_info.using_zk:
+ if self.quorum_info.using_zk and self.configured_for_zk_migration:
+ return [self.controller_listener_name(self.controller_quorum.controller_security_protocol)]
+ elif self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
# Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
@@ -775,7 +816,7 @@ def controller_listener_name_list(self, node):
self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name]
- def start_node(self, node, timeout_sec=60):
+ def start_node(self, node, timeout_sec=60, **kwargs):
if node not in self.nodes_to_start:
return
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py
index d188eecb9beec..499085b2a2ea7 100644
--- a/tests/kafkatest/services/kafka/quorum.py
+++ b/tests/kafkatest/services/kafka/quorum.py
@@ -78,22 +78,21 @@ class ServiceQuorumInfo:
True iff quorum_type==COLOCATED_KRAFT
"""
- def __init__(self, kafka, context):
+ def __init__(self, quorum_type, kafka):
"""
- :param kafka : KafkaService
- The service for which this instance exposes quorum-related
- information
+ :param quorum_type : str
+ The type of quorum being used. Either "ZK", "COLOCATED_KRAFT", or "REMOTE_KRAFT"
:param context : TestContext
The test context within which the this instance and the
given Kafka service is being instantiated
"""
- quorum_type = for_test(context)
if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft:
raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it")
if kafka.remote_kafka and quorum_type != remote_kraft:
raise Exception("Cannot specify a remote Kafka service unless using a remote KRaft metadata quorum (should not happen)")
+
self.kafka = kafka
self.quorum_type = quorum_type
self.using_zk = quorum_type == zk
@@ -102,6 +101,21 @@ def __init__(self, kafka, context):
self.has_controllers = quorum_type == colocated_kraft or kafka.remote_kafka
self.has_brokers_and_controllers = quorum_type == colocated_kraft
+ @staticmethod
+ def from_test_context(kafka, context):
+ """
+ :param kafka : KafkaService
+ The service for which this instance exposes quorum-related
+ information
+ :param context : TestContext
+ The test context within which the this instance and the
+ given Kafka service is being instantiated
+ """
+
+ quorum_type = for_test(context)
+ return ServiceQuorumInfo(quorum_type, kafka)
+
+
class NodeQuorumInfo:
"""
Exposes quorum-related information for a node in a KafkaService
diff --git a/tests/kafkatest/tests/core/zookeeper_migration_test.py b/tests/kafkatest/tests/core/zookeeper_migration_test.py
new file mode 100644
index 0000000000000..257b9f79158e8
--- /dev/null
+++ b/tests/kafkatest/tests/core/zookeeper_migration_test.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from functools import partial
+import time
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.config_property import CLUSTER_ID
+from kafkatest.services.kafka.quorum import remote_kraft, ServiceQuorumInfo, zk
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import DEV_BRANCH
+
+
+class TestMigration(ProduceConsumeValidateTest):
+ def __init__(self, test_context):
+ super(TestMigration, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.partitions = 3
+ self.replication_factor = 3
+
+ # Producer and consumer
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def wait_until_rejoin(self):
+ for partition in range(0, self.partitions):
+ wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
+ backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
+
+ def do_migration(self):
+ # Start up KRaft controller in migration mode
+ remote_quorum = partial(ServiceQuorumInfo, remote_kraft)
+ controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=DEV_BRANCH,
+ allow_zk_with_kraft=True,
+ remote_kafka=self.kafka,
+ server_prop_overrides=[["zookeeper.connect", self.zk.connect_setting()],
+ ["zookeeper.metadata.migration.enable", "true"]],
+ quorum_info_provider=remote_quorum)
+ controller.start()
+
+ self.logger.info("Restarting ZK brokers in migration mode")
+ self.kafka.reconfigure_zk_for_migration(controller)
+
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ self.kafka.start_node(node)
+ self.wait_until_rejoin()
+
+ self.logger.info("Restarting ZK brokers as KRaft brokers")
+ time.sleep(10)
+ self.kafka.reconfigure_zk_as_kraft(controller)
+
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ self.kafka.start_node(node)
+ self.wait_until_rejoin()
+
+ def test_online_migration(self):
+ zk_quorum = partial(ServiceQuorumInfo, zk)
+ self.zk = ZookeeperService(self.test_context, num_nodes=1, version=DEV_BRANCH)
+ self.kafka = KafkaService(self.test_context,
+ num_nodes=3,
+ zk=self.zk,
+ version=DEV_BRANCH,
+ quorum_info_provider=zk_quorum,
+ allow_zk_with_kraft=True,
+ server_prop_overrides=[["zookeeper.metadata.migration.enable", "false"]])
+ self.kafka.security_protocol = "PLAINTEXT"
+ self.kafka.interbroker_security_protocol = "PLAINTEXT"
+ self.zk.start()
+
+ self.logger.info("Pre-generating clusterId for ZK.")
+ cluster_id_json = """{"version": "1", "id": "%s"}""" % CLUSTER_ID
+ self.zk.create(path="/cluster")
+ self.zk.create(path="/cluster/id", value=cluster_id_json)
+ self.kafka.start()
+
+ topic_cfg = {
+ "topic": self.topic,
+ "partitions": self.partitions,
+ "replication-factor": self.replication_factor,
+ "configs": {"min.insync.replicas": 2}
+ }
+ self.kafka.create_topic(topic_cfg)
+
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int,
+ compression_types=["none"],
+ version=DEV_BRANCH)
+
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000,
+ message_validator=is_int, version=DEV_BRANCH)
+
+ self.run_produce_consume_validate(core_test_action=self.do_migration)