Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
859aa11
KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in K…
akhileshchg Dec 15, 2022
fcde47a
add missing files
akhileshchg Dec 15, 2022
35c12dc
Merge remote-tracking branch 'origin/trunk' into migration_driver_stub
mumrah Dec 15, 2022
13c82df
MINOR: ControllerServer should use the new metadata loader and snapsh…
mumrah Dec 15, 2022
5221e33
Register the migration driver with the controller
mumrah Dec 15, 2022
73a6af9
Merge remote-tracking branch 'origin/trunk' into migration_driver_stub
mumrah Dec 16, 2022
88aa6d7
Fix build issue and add helper functions to cluster image and delta
akhileshchg Dec 16, 2022
d52cd95
WIP
mumrah Dec 16, 2022
df21677
RPC support from KRaft controller to brokers
akhileshchg Dec 19, 2022
7da1d1f
Progress on KRaft to ZK dual write
mumrah Dec 19, 2022
032aa3f
Merge remote-tracking branch 'akhilesh/migration_driver_stub' into mi…
mumrah Dec 19, 2022
543324a
Merge remote-tracking branch 'origin/trunk' into migration_driver_stub
mumrah Dec 19, 2022
c38c7df
Progress on end-to-end topic creation
mumrah Dec 20, 2022
d4226dc
Send ZK broker epoch in registration
mumrah Dec 20, 2022
69e87f9
track ZK controller epoch and version
mumrah Dec 20, 2022
de66d21
Add better logging for migration
mumrah Dec 20, 2022
e526d10
Merge remote-tracking branch 'origin/trunk' into migration_driver_stub
mumrah Dec 20, 2022
b93409c
fixup after merge
mumrah Dec 20, 2022
1f874b9
Add system test for online migration
mumrah Dec 22, 2022
4634fcc
Send ZK broker epoch in registration
mumrah Dec 27, 2022
c9fb0e6
Change the state machine a bit, get system test working
mumrah Dec 27, 2022
2f13655
Progress on end-to-end test
mumrah Jan 3, 2023
3bd173d
Merge remote-tracking branch 'upstream/trunk' into migration_driver_stub
akhileshchg Jan 3, 2023
8167ea5
Use the correct epoch on ZK broker
mumrah Jan 3, 2023
4352a4b
Merge remote-tracking branch 'akhilesh/migration_driver_stub' into mi…
mumrah Jan 4, 2023
4660527
wip on epoch
mumrah Jan 4, 2023
4dbb5a3
WIP on end-to-end test
mumrah Jan 4, 2023
e4a7342
Merge remote-tracking branch 'upstream/trunk' into migration_driver_stub
akhileshchg Jan 5, 2023
dacff92
end to end test is passing
mumrah Jan 5, 2023
9e2d4a9
Merge remote-tracking branch 'akhilesh/migration_driver_stub' into mi…
mumrah Jan 5, 2023
cd5141c
Address comments
akhileshchg Jan 5, 2023
028b3e6
Fixup after merge from trunk
mumrah Jan 5, 2023
1f937b0
Merge remote-tracking branch 'akhilesh/migration_driver_stub' into mi…
mumrah Jan 5, 2023
c39b37d
Fix un-thread-safe access to QC fields, fix some variable names
cmccabe Jan 6, 2023
960aed5
fix checkstyle
cmccabe Jan 6, 2023
358f535
Revert default exception handler
mumrah Jan 6, 2023
c5aa43b
Pin metadata.version to 3.4, fix some tests
mumrah Jan 6, 2023
b1e2c19
Merge remote-tracking branch 'akhilesh/migration_driver_stub' into mi…
mumrah Jan 6, 2023
9d051dd
Fix failing integration tests
mumrah Jan 8, 2023
c1ddfca
Close ZK client before KRaftMigrationDriver in shutdown
mumrah Jan 8, 2023
894a917
fix checkstyle
cmccabe Jan 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ project(':core') {

task genTopicConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'kafka.log.LogConfig'
mainClass = 'org.apache.kafka.server.log.internals.LogConfig'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "topic_config.html").newOutputStream()
}
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<allow pkg="kafka.network"/>
<allow pkg="kafka.api"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.zk" />
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="integration.kafka.server" class="IntegrationTestHelper"/>
<subpackage name="annotation">
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Builder(BrokerRegistrationRequestData data) {

@Override
public short oldestAllowedVersion() {
if (data.isMigratingZkBroker()) {
if (data.migratingZkBrokerEpoch() != -1) {
return (short) 1;
} else {
return (short) 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
{
"apiKey":62,
"type": "request",
Expand Down Expand Up @@ -51,7 +52,7 @@
},
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The rack which this broker is in." },
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
"about": "Set by a ZK broker if the required configurations for ZK migration are present." }
{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1",
"about": "If the required configurations for ZK migration are present, this value is set to the ZK broker epoch" }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
{
"apiKey": 62,
"type": "response",
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup

import kafka.server.FetchDataInfo
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.{KafkaException, TopicPartition}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ sealed class MigrationControllerChannelContext(
}

override val liveBrokerIdAndEpochs: collection.Map[Int, Long] = {
image.cluster().zkBrokers().asScala.map {
image.cluster().brokers().asScala.map {
case (brokerId, broker) => brokerId.intValue() -> broker.epoch()
}
}

override val liveOrShuttingDownBrokers: collection.Set[Broker] = {
image.cluster().zkBrokers().asScala.values.map { registration =>
image.cluster().brokers().asScala.values.map { registration =>
Broker.fromBrokerRegistration(registration)
}.toSet
}
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/migration/MigrationPropagator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import scala.jdk.CollectionConverters._

class MigrationPropagator(
nodeId: Int,
config: KafkaConfig,
metadataVersionProvider: () => MetadataVersion,
config: KafkaConfig
) extends LegacyPropagator {
@volatile private var _image = MetadataImage.EMPTY
@volatile private var metadataVersion = MetadataVersion.IBP_3_4_IV0
val stateChangeLogger = new StateChangeLogger(nodeId, inControllerContext = true, None)
val channelManager = new ControllerChannelManager(
() => _image.highestOffsetAndEpoch().epoch(),
Expand All @@ -48,7 +48,7 @@ class MigrationPropagator(
val requestBatch = new MigrationPropagatorBatch(
config,
metadataProvider,
metadataVersionProvider,
() => metadataVersion,
channelManager,
stateChangeLogger
)
Expand Down Expand Up @@ -201,4 +201,8 @@ class MigrationPropagator(
override def clear(): Unit = {
requestBatch.clear()
}

override def setMetadataVersion(newMetadataVersion: MetadataVersion): Unit = {
metadataVersion = newMetadataVersion
}
}
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: Option[String],
val isZkBroker: Boolean = false
val isZkBroker: Boolean,
val zkBrokerEpochSupplier: () => Long
) extends Logging {

val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
Expand Down Expand Up @@ -271,7 +272,7 @@ class BrokerLifecycleManager(
_advertisedListeners = advertisedListeners.duplicate()
_supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
if (!isZkBroker) {
// ZK brokers don't block on registration during startup
// Only KRaft brokers block on registration during startup
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
Expand All @@ -290,9 +291,20 @@ class BrokerLifecycleManager(
setMinSupportedVersion(range.min()).
setMaxSupportedVersion(range.max()))
}
val migrationZkBrokerEpoch: Long = {
if (isZkBroker) {
val zkBrokerEpoch: Long = Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1)
if (zkBrokerEpoch < 0) {
throw new IllegalStateException("Trying to sending BrokerRegistration in migration Zk " +
"broker without valid zk broker epoch")
}
zkBrokerEpoch
} else
-1
}
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setIsMigratingZkBroker(isZkBroker).
setMigratingZkBrokerEpoch(migrationZkBrokerEpoch).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,16 @@ class RawMetaProperties(val props: Properties = new Properties()) {

object MetaProperties {
def parse(properties: RawMetaProperties): MetaProperties = {
properties.requireVersion(expectedVersion = 1)
val clusterId = require(ClusterIdKey, properties.clusterId)
val nodeId = require(NodeIdKey, properties.nodeId)
new MetaProperties(clusterId, nodeId)
if (properties.version == 1) {
val nodeId = require(NodeIdKey, properties.nodeId)
new MetaProperties(clusterId, nodeId)
} else if (properties.version == 0) {
val brokerId = require(BrokerIdKey, properties.brokerId)
new MetaProperties(clusterId, brokerId)
} else {
throw new RuntimeException(s"Expected version 0 or 1, but got version ${properties.version}")
}
}

def require[T](key: String, value: Option[T]): T = {
Expand Down Expand Up @@ -182,7 +188,8 @@ object BrokerMetadataCheckpoint extends Logging {
if (brokerMetadataMap.isEmpty) {
(new RawMetaProperties(), offlineDirs)
} else {
val numDistinctMetaProperties = brokerMetadataMap.values.toSet.size
val parsedProperties = brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props)))
val numDistinctMetaProperties = parsedProperties.toSet.size
if (numDistinctMetaProperties > 1) {
val builder = new StringBuilder

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ class BrokerServer(

lifecycleManager = new BrokerLifecycleManager(config,
time,
threadNamePrefix)
threadNamePrefix,
isZkBroker = false,
() => -1)

/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
Expand Down
61 changes: 52 additions & 9 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,58 @@

package kafka.server

import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.cluster.Broker.ServerInfo
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}

import scala.jdk.CollectionConverters._
import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._


case class ControllerMigrationSupport(
zkClient: KafkaZkClient,
migrationDriver: KRaftMigrationDriver,
brokersRpcClient: LegacyPropagator
) {
def shutdown(logging: Logging): Unit = {
if (zkClient != null) {
CoreUtils.swallow(zkClient.close(), logging)
}
if (brokersRpcClient != null) {
CoreUtils.swallow(brokersRpcClient.shutdown(), logging)
}
if (migrationDriver != null) {
CoreUtils.swallow(migrationDriver.close(), logging)
}
}
}

/**
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
Expand Down Expand Up @@ -81,6 +103,7 @@ class ControllerServer(
var quotaManagers: QuotaManagers = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
var migrationSupport: Option[ControllerMigrationSupport] = None

private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
Expand Down Expand Up @@ -112,7 +135,6 @@ class ControllerServer(
maybeChangeStatus(STARTING, STARTED)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()


newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)

Expand Down Expand Up @@ -217,6 +239,26 @@ class ControllerServer(
doRemoteKraftSetup()
}

if (config.migrationEnabled) {
val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
val migrationClient = new ZkMigrationClient(zkClient)
val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
val migrationDriver = new KRaftMigrationDriver(
config.nodeId,
controller.asInstanceOf[QuorumController].zkRecordConsumer(),
migrationClient,
propagator,
publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)),
sharedServer.faultHandlerFactory.build(
"zk migration",
fatal = false,
() => {}
)
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
}

quotaManagers = QuotaFactory.instantiate(config,
metrics,
time,
Expand Down Expand Up @@ -267,6 +309,7 @@ class ControllerServer(
sharedServer.ensureNotRaftLeader()
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
migrationSupport.foreach(_.shutdown(this))
if (controller != null)
controller.beginShutdown()
if (socketServer != null)
Expand Down
19 changes: 4 additions & 15 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]

authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(zkSupport, leaderAndIsrRequest.brokerEpoch)) {
if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received LeaderAndIsr request with broker epoch ${leaderAndIsrRequest.brokerEpoch} " +
Expand All @@ -289,7 +289,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(zkSupport, stopReplicaRequest.brokerEpoch)) {
if (zkSupport.isBrokerEpochStale(stopReplicaRequest.brokerEpoch, stopReplicaRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received StopReplica request with broker epoch ${stopReplicaRequest.brokerEpoch} " +
Expand Down Expand Up @@ -349,7 +349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val updateMetadataRequest = request.body[UpdateMetadataRequest]

authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(zkSupport, updateMetadataRequest.brokerEpoch)) {
if (zkSupport.isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received UpdateMetadata request with broker epoch ${updateMetadataRequest.brokerEpoch} " +
Expand Down Expand Up @@ -3176,7 +3176,7 @@ class KafkaApis(val requestChannel: RequestChannel,
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
metadataSupport match {
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache) =>
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter)

val entriesData = result.iterator.map { case (quotaEntity, quotaValues) =>
Expand Down Expand Up @@ -3528,17 +3528,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}

private def isBrokerEpochStale(zkSupport: ZkSupport, brokerEpochInRequest: Long): Boolean = {
// Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
brokerEpochInRequest < zkSupport.controller.brokerEpoch
}
}
}

object KafkaApis {
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
if (migrationEnabled) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value. " +
s"`${KafkaConfig.ZkConnectProp}` is required because `${KafkaConfig.MigrationEnabledProp} is set to true.")
}
}
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
Expand Down Expand Up @@ -2193,7 +2199,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
if (migrationEnabled) {
validateNonEmptyQuorumVotersForKRaft()
require(controllerListenerNames.nonEmpty,
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZK migration mode: ${controllerListenerNames.asJava}")
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
s"'${KafkaConfig.InterBrokerProtocolVersionProp}' to 3.4 or higher")
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
Expand Down
Loading