Skip to content

Commit

Permalink
MINOR; Fix incompatible change to the kafka config (#16464)
Browse files Browse the repository at this point in the history
Prior to KIP-853, users were not allow to enumerate listeners specified in `controller.listener.names` in the `advertised.listeners`. This decision was made in 3.3 because the `controller.quorum.voters` property is in effect the list of advertised listeners for all of the controllers.

KIP-853 is moving away from `controller.quorum.voters` in favor of a dynamic set of voters. This means that the user needs to have a way of specifying the advertised listeners for controller.

This change allows the users to specify listener names in `controller.listener.names` in `advertised.listeners`. To make this change forwards compatible (use a valid configuration from 3.8 in 3.9), the controller's advertised listeners are going to get computed by looking up the endpoint in `advertised.listeners`. If it doesn't exist, the controller will look up the endpoint in the `listeners` configuration.

This change also includes a fix the to the BeginQuorumEpoch request where the default value for VoterId was 0 instead of -1.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
jsancio authored Jun 28, 2024
1 parent ebaa108 commit 9be27e7
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null"},
{ "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "ignorable": true,
"about": "The voter ID of the receiving replica" },
{ "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId",
"about": "The replica id of the voter receiving the request" },
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)

val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedListeners.map(_.toJava).asJava).
config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val oldConfig = server.config
val newListeners = listenersToMap(newConfig.listeners)
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners)
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
Expand All @@ -1093,8 +1093,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi

// Currently, we do not support adding or removing listeners when in KRaft mode.
// However, we support changing other listener configurations (max connections, etc.)
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
listenersToMap(newConfig.effectiveAdvertisedListeners))) {
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported()
}
}
Expand All @@ -1111,8 +1111,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
}
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
listenersToMap(newConfig.effectiveAdvertisedListeners))) {
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported()
server match {
case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
Expand Down
38 changes: 24 additions & 14 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,17 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}

def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
// Only expose controller listeners
advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
val controllerListenersValue = controllerListeners

controllerListenerNames.flatMap { name =>
controllerAdvertisedListeners
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
.orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))))
}
}

def effectiveAdvertisedListeners: Seq[EndPoint] = {
def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
// Only expose broker listeners
advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}
Expand Down Expand Up @@ -919,7 +925,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
" to prevent frequent changes in ISR")
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet

// validate KRaft-related configs
val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
Expand All @@ -938,7 +944,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
}
def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
require(advertisedListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())),
require(advertisedBrokerListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())),
s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
}
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
Expand All @@ -955,7 +961,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
}
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
require(advertisedListenerNames.nonEmpty,
require(advertisedBrokerListenerNames.nonEmpty,
"There must be at least one advertised listener." + (
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
}
Expand Down Expand Up @@ -992,7 +998,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControlPlaneListenerEmptyForKRaft()
// listeners should only contain listeners also enumerated in the controller listener
require(
effectiveAdvertisedListeners.isEmpty,
effectiveAdvertisedControllerListeners.size == listeners.size,
s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
)
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
Expand Down Expand Up @@ -1032,25 +1038,29 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
validateAdvertisedListenersNonEmptyForBroker()
require(advertisedListenerNames.contains(interBrokerListenerName),
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
require(advertisedListenerNames.subsetOf(listenerNames),
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
require(advertisedBrokerListenerNames.subsetOf(listenerNames),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " +
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
}

require(!effectiveAdvertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
require(!effectiveAdvertisedBrokerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")

require(!effectiveAdvertisedControllerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")

// validate control.plane.listener.name config
if (controlPlaneListenerName.isDefined) {
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get),
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
// controlPlaneListenerName should be different from interBrokerListenerName
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " +
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ class KafkaServer(
raftManager.startup()

val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep =>
config.effectiveAdvertisedBrokerListeners.foreach { ep =>
networkListeners.add(new Listener().
setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
setName(ep.listenerName.value()).
Expand Down Expand Up @@ -752,14 +752,14 @@ class KafkaServer(
}

def createBrokerInfo: BrokerInfo = {
val endPoints = config.effectiveAdvertisedListeners.map(e => s"${e.host}:${e.port}")
val endPoints = config.effectiveAdvertisedBrokerListeners.map(e => s"${e.host}:${e.port}")
zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
s" advertised listeners are already registered by broker ${broker.id}")
}

val listeners = config.effectiveAdvertisedListeners.map { endpoint =>
val listeners = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
Expand Down Expand Up @@ -1107,7 +1107,7 @@ class KafkaServer(

/** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */
def advertisedListeners: Seq[EndPoint] = {
config.effectiveAdvertisedListeners.map { endPoint =>
config.effectiveAdvertisedBrokerListeners.map { endPoint =>
endPoint.copy(port = boundPort(endPoint.listenerName))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
val newListeners = server.config.effectiveAdvertisedListeners.map { e =>
val newListeners = server.config.effectiveAdvertisedBrokerListeners.map { e =>
if (e.listenerName.value == SecureExternal)
s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
else
Expand All @@ -1527,7 +1527,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
adminClient.alterConfigs(configs).all.get
servers.foreach { server =>
TestUtils.retry(10000) {
val externalListener = server.config.effectiveAdvertisedListeners.find(_.listenerName.value == SecureExternal)
val externalListener = server.config.effectiveAdvertisedBrokerListeners.find(_.listenerName.value == SecureExternal)
.getOrElse(throw new IllegalStateException("External listener not found"))
assertEquals(newHost, externalListener.host, "Config not updated")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class SocketServerTest {
val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config)

val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint =>
val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava)

Expand Down
Loading

0 comments on commit 9be27e7

Please sign in to comment.