diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 9a3215f7a5057..2ed177b6fbc55 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -105,6 +105,7 @@ public class SslConfigs { public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm"; public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. "; + public static final String DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "https"; public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = "ssl.secure.random.implementation"; public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_DOC = "The SecureRandom PRNG implementation to use for SSL cryptography operations. "; @@ -134,7 +135,7 @@ public static void addClientSslSupport(ConfigDef config) { .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC) .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC) + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC) .define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 2df4c4fe90fe9..f5af400dd4429 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -237,23 +237,41 @@ public void testInvalidEndpointIdentification() throws Exception { } /** - * Tests that server certificate with invalid IP address is accepted by + * Tests that server certificate with invalid host name is accepted by * a client that has disabled endpoint validation */ @Test public void testEndpointIdentificationDisabled() throws Exception { - String node = "0"; - String serverHost = InetAddress.getLocalHost().getHostAddress(); + serverCertStores = new CertStores(true, "server", "notahost"); + clientCertStores = new CertStores(false, "client", "localhost"); + sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); + sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); + SecurityProtocol securityProtocol = SecurityProtocol.SSL; - server = new NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, - new TestSecurityConfig(sslServerConfigs), serverHost, null, null); - server.start(); - sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + server = createEchoServer(SecurityProtocol.SSL); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + + // Disable endpoint validation, connection should succeed + String node = "1"; + sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress(serverHost, server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + + // Disable endpoint validation using null value, connection should succeed + String node2 = "2"; + sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, null); + createSelector(sslClientConfigs); + selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.checkClientConnection(selector, node2, 100, 10); + + // Connection should fail with endpoint validation enabled + String node3 = "3"; + sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); + createSelector(sslClientConfigs); + selector.connect(node3, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.State.AUTHENTICATION_FAILED); + selector.close(); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index d7860ff6618c6..aa677db89a873 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -472,7 +472,7 @@ public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() thro */ @Test public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws Exception { - testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1); + testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short) 1); } /** @@ -1466,7 +1466,7 @@ private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtoc private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception { createSelector(securityProtocol, saslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); } diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index c19599d3cc0ef..d08e452f78cf6 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -173,11 +173,13 @@ object ConfigCommand extends Config { private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = { val props = new Properties if (opts.options.has(opts.addConfig)) { - //split by commas, but avoid those in [], then into KV pairs + // Split list by commas, but avoid those in [], then into KV pairs + // Each KV pair is of format key=value, split them into key and value, using -1 as the limit for split() to + // include trailing empty strings. This is to support empty value (e.g. 'ssl.endpoint.identification.algorithm=') val pattern = "(?=[^\\]]*(?:\\[|$))" val configsToBeAdded = opts.options.valueOf(opts.addConfig) .split("," + pattern) - .map(_.split("""\s*=\s*""" + pattern)) + .map(_.split("""\s*=\s*""" + pattern, -1)) require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\".") //Create properties, parsing square brackets from values if necessary configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim)) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a78bb4d406fc2..19bb80749584d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -208,6 +208,7 @@ object Defaults { val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM + val SslEndpointIdentificationAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM val SslClientAuthRequired = "required" val SslClientAuthRequested = "requested" val SslClientAuthNone = "none" @@ -955,7 +956,7 @@ object KafkaConfig { .define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, SslTruststorePasswordDoc) .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc) .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc) - .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc) + .define(SslEndpointIdentificationAlgorithmProp, STRING, Defaults.SslEndpointIdentificationAlgorithm, LOW, SslEndpointIdentificationAlgorithmDoc) .define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc) .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc) .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc) diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 66e98f5b1cefa..8238650911ce9 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -137,6 +137,20 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts) assertEquals(1, deletedProps.size) assertEquals("a", deletedProps.head) + + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "x", + "--entity-type", entityType, + "--alter", + "--add-config", "a=b,c=,d=e,f=")) + createOpts.checkArgs() + + val addedProps2 = ConfigCommand.parseConfigsToBeAdded(createOpts) + assertEquals(4, addedProps2.size()) + assertEquals("b", addedProps2.getProperty("a")) + assertEquals("e", addedProps2.getProperty("d")) + assertTrue(addedProps2.getProperty("c").isEmpty) + assertTrue(addedProps2.getProperty("f").isEmpty) } @Test(expected = classOf[IllegalArgumentException]) diff --git a/docs/security.html b/docs/security.html index 4fcbdad72a5a9..06dd8fbd1fc03 100644 --- a/docs/security.html +++ b/docs/security.html @@ -52,11 +52,29 @@

7.2 Encryption and Authentication
  • validity: the valid time of the certificate in days.

  • - Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following property: + +
    Configuring Host Name Verification
    + From Kafka version 2.0.0 onwards, host name verification of servers is enabled by default for client connections + as well as inter-broker connections to prevent man-in-the-middle attacks. Server host name verification may be disabled + by setting ssl.endpoint.identification.algorithm to an empty string. For example, +
    	ssl.endpoint.identification.algorithm=
    + For dynamically configured broker listeners, hostname verification may be disabled using kafka-configs.sh. + For example, +
    +        bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
    +        
    + + For older versions of Kafka, ssl.endpoint.identification.algorithm is not defined by default, so host name + verification is not performed. The property should be set to HTTPS to enable host name verification.
    	ssl.endpoint.identification.algorithm=HTTPS 
    - Once enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields: + Host name verification must be enabled to prevent man-in-the-middle attacks if server endpoints are not validated + externally. + +

    Configuring Host Name In Certificates
    + If host name verification is enabled, clients will verify the server's fully qualified domain name (FQDN) against one of + the following two fields:
    1. Common Name (CN)
    2. Subject Alternative Name (SAN) diff --git a/docs/upgrade.html b/docs/upgrade.html index 056fb8366e44f..a399a620f40bb 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -67,6 +67,7 @@
      Notable changes in 2
      • KIP-186 increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config offsets.retention.minutes to 1440.
      • Support for Java 7 has been dropped, Java 8 is now the minimum version required.
      • +
      • The default value for ssl.endpoint.identification.algorithm was changed to https, which performs hostname verification (man-in-the-middle attacks are possible otherwise). Set ssl.endpoint.identification.algorithm to an empty string to restore the previous behaviour.
      • KAFKA-5674 extends the lower interval of max.connections.per.ip minimum to zero and therefore allows IP-based filtering of inbound connections.
      • KIP-272 added API version tag to the metric kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}. diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 55986bb5820f5..c884320fc1c6e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -87,7 +87,7 @@ public KafkaEmbedded(final Properties config, final MockTime time) throws IOExce private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException { final Properties effectiveConfig = new Properties(); effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0); - effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1"); + effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "localhost"); effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092"); effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1); effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); @@ -100,7 +100,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) throws IO } /** - * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. + * This broker's `metadata.broker.list` value. Example: `localhost:9092`. *

        * You can use this to tell Kafka producers and consumers how to connect to this instance. */