From 9be27e715a209a892941bf35e66859d9c39c28c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Thu, 27 Jun 2024 21:24:25 -0400 Subject: [PATCH] MINOR; Fix incompatible change to the kafka config (#16464) Prior to KIP-853, users were not allow to enumerate listeners specified in `controller.listener.names` in the `advertised.listeners`. This decision was made in 3.3 because the `controller.quorum.voters` property is in effect the list of advertised listeners for all of the controllers. KIP-853 is moving away from `controller.quorum.voters` in favor of a dynamic set of voters. This means that the user needs to have a way of specifying the advertised listeners for controller. This change allows the users to specify listener names in `controller.listener.names` in `advertised.listeners`. To make this change forwards compatible (use a valid configuration from 3.8 in 3.9), the controller's advertised listeners are going to get computed by looking up the endpoint in `advertised.listeners`. If it doesn't exist, the controller will look up the endpoint in the `listeners` configuration. This change also includes a fix the to the BeginQuorumEpoch request where the default value for VoterId was 0 instead of -1. Reviewers: Colin P. McCabe --- .../message/BeginQuorumEpochRequest.json | 4 +- .../scala/kafka/server/BrokerServer.scala | 2 +- .../kafka/server/DynamicBrokerConfig.scala | 10 +-- .../main/scala/kafka/server/KafkaConfig.scala | 38 +++++++---- .../main/scala/kafka/server/KafkaServer.scala | 8 +-- .../DynamicBrokerReconfigurationTest.scala | 4 +- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 67 +++++++++++++++++-- .../server/RegistrationTestContext.scala | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 2 +- .../apache/kafka/raft/KafkaRaftClient.java | 10 +-- 11 files changed, 107 insertions(+), 42 deletions(-) diff --git a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json index 1e956a1fc9454..9302bd603d6e7 100644 --- a/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json +++ b/clients/src/main/resources/common/message/BeginQuorumEpochRequest.json @@ -24,8 +24,8 @@ "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null"}, - { "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "ignorable": true, - "about": "The voter ID of the receiving replica" }, + { "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId", + "about": "The replica id of the voter receiving the request" }, { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 29f3a5cc0b086..b962d49e9c411 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -255,7 +255,7 @@ class BrokerServer( clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()), - config.effectiveAdvertisedListeners.map(_.toJava).asJava). + config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava). withWildcardHostnamesResolved(). withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 96566826b0432..d00f682cafd5b 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1069,7 +1069,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi def validateReconfiguration(newConfig: KafkaConfig): Unit = { val oldConfig = server.config val newListeners = listenersToMap(newConfig.listeners) - val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners) + val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners) val oldListeners = listenersToMap(oldConfig.listeners) if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'") @@ -1093,8 +1093,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi // Currently, we do not support adding or removing listeners when in KRaft mode. // However, we support changing other listener configurations (max connections, etc.) - if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners), - listenersToMap(newConfig.effectiveAdvertisedListeners))) { + if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners), + listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) { verifyListenerRegistrationAlterationSupported() } } @@ -1111,8 +1111,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved) if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded) } - if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners), - listenersToMap(newConfig.effectiveAdvertisedListeners))) { + if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners), + listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) { verifyListenerRegistrationAlterationSupported() server match { case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b436ec974aebc..f230140c750f0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -789,11 +789,17 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } def effectiveAdvertisedControllerListeners: Seq[EndPoint] = { - // Only expose controller listeners - advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value())) + val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value())) + val controllerListenersValue = controllerListeners + + controllerListenerNames.flatMap { name => + controllerAdvertisedListeners + .find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))) + .orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))) + } } - def effectiveAdvertisedListeners: Seq[EndPoint] = { + def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = { // Only expose broker listeners advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) } @@ -919,7 +925,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami " to prevent frequent changes in ISR") require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") - val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet + val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet // validate KRaft-related configs val voterIds = QuorumConfig.parseVoterIds(quorumVoters) @@ -938,7 +944,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.") } def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = { - require(advertisedListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())), + require(advertisedBrokerListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())), s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.") } def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = { @@ -955,7 +961,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role") } def validateAdvertisedListenersNonEmptyForBroker(): Unit = { - require(advertisedListenerNames.nonEmpty, + require(advertisedBrokerListenerNames.nonEmpty, "There must be at least one advertised listener." + ( if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else "")) } @@ -992,7 +998,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami validateControlPlaneListenerEmptyForKRaft() // listeners should only contain listeners also enumerated in the controller listener require( - effectiveAdvertisedListeners.isEmpty, + effectiveAdvertisedControllerListeners.size == listeners.size, s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller" ) validateControllerQuorumVotersMustContainNodeIdForKRaftController() @@ -1032,25 +1038,29 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) validateAdvertisedListenersNonEmptyForBroker() - require(advertisedListenerNames.contains(interBrokerListenerName), + require(advertisedBrokerListenerNames.contains(interBrokerListenerName), s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + - s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") - require(advertisedListenerNames.subsetOf(listenerNames), + s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}") + require(advertisedBrokerListenerNames.subsetOf(listenerNames), s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " + - s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + + s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + s"are ${listenerNames.map(_.value).mkString(",")}" ) } - require(!effectiveAdvertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"), + require(!effectiveAdvertisedBrokerListeners.exists(endpoint => endpoint.host=="0.0.0.0"), + s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+ + s"Use a routable IP address.") + + require(!effectiveAdvertisedControllerListeners.exists(endpoint => endpoint.host=="0.0.0.0"), s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+ s"Use a routable IP address.") // validate control.plane.listener.name config if (controlPlaneListenerName.isDefined) { - require(advertisedListenerNames.contains(controlPlaneListenerName.get), + require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get), s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + - s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") + s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}") // controlPlaneListenerName should be different from interBrokerListenerName require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()), s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " + diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index be2d81ef17980..477774f0166b4 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -462,7 +462,7 @@ class KafkaServer( raftManager.startup() val networkListeners = new ListenerCollection() - config.effectiveAdvertisedListeners.foreach { ep => + config.effectiveAdvertisedBrokerListeners.foreach { ep => networkListeners.add(new Listener(). setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). setName(ep.listenerName.value()). @@ -752,14 +752,14 @@ class KafkaServer( } def createBrokerInfo: BrokerInfo = { - val endPoints = config.effectiveAdvertisedListeners.map(e => s"${e.host}:${e.port}") + val endPoints = config.effectiveAdvertisedBrokerListeners.map(e => s"${e.host}:${e.port}") zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker => val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints) require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" + s" advertised listeners are already registered by broker ${broker.id}") } - val listeners = config.effectiveAdvertisedListeners.map { endpoint => + val listeners = config.effectiveAdvertisedBrokerListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else @@ -1107,7 +1107,7 @@ class KafkaServer( /** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */ def advertisedListeners: Seq[EndPoint] = { - config.effectiveAdvertisedListeners.map { endPoint => + config.effectiveAdvertisedBrokerListeners.map { endPoint => endPoint.copy(port = boundPort(endPoint.listenerName)) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 8508607eefde5..df39d23ea7a2c 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -1515,7 +1515,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = { val configs = servers.map { server => val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) - val newListeners = server.config.effectiveAdvertisedListeners.map { e => + val newListeners = server.config.effectiveAdvertisedBrokerListeners.map { e => if (e.listenerName.value == SecureExternal) s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}" else @@ -1527,7 +1527,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup adminClient.alterConfigs(configs).all.get servers.foreach { server => TestUtils.retry(10000) { - val externalListener = server.config.effectiveAdvertisedListeners.find(_.listenerName.value == SecureExternal) + val externalListener = server.config.effectiveAdvertisedBrokerListeners.find(_.listenerName.value == SecureExternal) .getOrElse(throw new IllegalStateException("External listener not found")) assertEquals(newHost, externalListener.host, "Config not updated") } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 96ca34e8c2995..a22d8180bbcfb 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -367,7 +367,7 @@ class SocketServerTest { val config = KafkaConfig.fromProps(testProps) val testableServer = new TestableSocketServer(config) - val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint => + val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint => endpoint.copy(port = testableServer.boundPort(endpoint.listenerName)) }.map(_.toJava) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 43a7136d4ce24..d0f0b6e87702e 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -165,7 +165,7 @@ class KafkaConfigTest { props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$port") val serverConfig = KafkaConfig.fromProps(props) - val endpoints = serverConfig.effectiveAdvertisedListeners + val endpoints = serverConfig.effectiveAdvertisedBrokerListeners assertEquals(1, endpoints.size) val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, hostName) @@ -181,7 +181,7 @@ class KafkaConfigTest { props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort") val serverConfig = KafkaConfig.fromProps(props) - val endpoints = serverConfig.effectiveAdvertisedListeners + val endpoints = serverConfig.effectiveAdvertisedBrokerListeners val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, advertisedHostName) @@ -274,7 +274,7 @@ class KafkaConfigTest { assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol) //advertised listener should contain control-plane listener - val advertisedEndpoints = serverConfig.effectiveAdvertisedListeners + val advertisedEndpoints = serverConfig.effectiveAdvertisedBrokerListeners assertTrue(advertisedEndpoints.exists { endpoint => endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value()) }) @@ -359,6 +359,59 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) } + @Test + def testEffectAdvertiseControllerListenerForControllerWithAdvertisement(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "CONTROLLER://lb1.example.com:9000") + + val config = KafkaConfig.fromProps(props) + assertEquals( + Seq(EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), + config.effectiveAdvertisedControllerListeners + ) + } + + @Test + def testEffectAdvertiseControllerListenerForControllerWithoutAdvertisement(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + + val config = KafkaConfig.fromProps(props) + assertEquals( + Seq(EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), + config.effectiveAdvertisedControllerListeners + ) + } + + @Test + def testEffectAdvertiseControllerListenerForControllerWithMixedAdvertisement(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093,CONTROLLER_NEW://localhost:9094") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER,CONTROLLER_NEW") + props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "CONTROLLER://lb1.example.com:9000") + + val config = KafkaConfig.fromProps(props) + assertEquals( + Seq( + EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT), + EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT) + ), + config.effectiveAdvertisedControllerListeners + ) + } + @Test def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = { val props = new Properties() @@ -492,7 +545,7 @@ class KafkaConfigTest { EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL), EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)) assertEquals(expectedListeners, config.listeners) - assertEquals(expectedListeners, config.effectiveAdvertisedListeners) + assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners) val expectedSecurityProtocolMap = Map( new ListenerName("CLIENT") -> SecurityProtocol.SSL, new ListenerName("REPLICATION") -> SecurityProtocol.SSL, @@ -523,7 +576,7 @@ class KafkaConfigTest { EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) ) - assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedListeners) + assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedBrokerListeners) val expectedSecurityProtocolMap = Map( new ListenerName("EXTERNAL") -> SecurityProtocol.SSL, @@ -591,7 +644,7 @@ class KafkaConfigTest { val conf = KafkaConfig.fromProps(props) assertEquals(listenerListToEndPoints("PLAINTEXT://:9092"), conf.listeners) assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host) - assertEquals(conf.effectiveAdvertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092")) + assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092")) } @nowarn("cat=deprecation") @@ -1220,7 +1273,7 @@ class KafkaConfigTest { assertEquals(false, config.brokerIdGenerationEnable) assertEquals(1, config.maxReservedBrokerId) assertEquals(1, config.brokerId) - assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedListeners.map(_.connectionString)) + assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString)) assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) assertEquals(List("/tmp1", "/tmp2"), config.logDirs) assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) diff --git a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala index 92e318f4412cd..8f00f0aa1c1ba 100644 --- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala +++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala @@ -64,7 +64,7 @@ class RegistrationTestContext( val clusterId = "x4AJGXQSRnephtTZzujw4w" val advertisedListeners = new ListenerCollection() val controllerEpoch = new AtomicInteger(123) - config.effectiveAdvertisedListeners.foreach { ep => + config.effectiveAdvertisedBrokerListeners.foreach { ep => advertisedListeners.add(new Listener().setHost(ep.host). setName(ep.listenerName.value()). setPort(ep.port.shortValue()). diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4bb10b151cbc..2842d239c0165 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -237,7 +237,7 @@ object TestUtils extends Logging { listenerName: ListenerName ): String = { brokers.map { s => - val listener = s.config.effectiveAdvertisedListeners.find(_.listenerName == listenerName).getOrElse( + val listener = s.config.effectiveAdvertisedBrokerListeners.find(_.listenerName == listenerName).getOrElse( sys.error(s"Could not find listener with name ${listenerName.value}")) formatAddress(listener.host, s.boundPort(listenerName)) }.mkString(",") diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 2e47eebadcaa0..23e0d96a52b40 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2009,10 +2009,12 @@ private Optional validateVoterOnlyRequest(int remoteNodeId, int requestE */ private boolean isValidVoterKey(Optional voterKey) { return voterKey - .map(key -> - OptionalInt.of(key.id()).equals(nodeId) && - key.directoryId().equals(Optional.of(nodeDirectoryId)) - ) + .map(key -> { + if (!OptionalInt.of(key.id()).equals(nodeId)) return false; + if (!key.directoryId().isPresent()) return true; + + return key.directoryId().get().equals(nodeDirectoryId); + }) .orElse(true); } /**