Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class SslConfigs {

public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
public static final String DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "https";

public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = "ssl.secure.random.implementation";
public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_DOC = "The SecureRandom PRNG implementation to use for SSL cryptography operations. ";
Expand Down Expand Up @@ -134,7 +135,7 @@ public static void addClientSslSupport(ConfigDef config) {
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,41 @@ public void testInvalidEndpointIdentification() throws Exception {
}

/**
* Tests that server certificate with invalid IP address is accepted by
* Tests that server certificate with invalid host name is accepted by
* a client that has disabled endpoint validation
*/
@Test
public void testEndpointIdentificationDisabled() throws Exception {
String node = "0";
String serverHost = InetAddress.getLocalHost().getHostAddress();
serverCertStores = new CertStores(true, "server", "notahost");
clientCertStores = new CertStores(false, "client", "localhost");
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);

SecurityProtocol securityProtocol = SecurityProtocol.SSL;
server = new NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol,
new TestSecurityConfig(sslServerConfigs), serverHost, null, null);
server.start();
sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
server = createEchoServer(SecurityProtocol.SSL);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());

// Disable endpoint validation, connection should succeed
String node = "1";
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the value is set to null instead of empty string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty string and null are handled in the same way, updated test to verify that as well.

createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress(serverHost, server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.checkClientConnection(selector, node, 100, 10);

// Disable endpoint validation using null value, connection should succeed
String node2 = "2";
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, null);
createSelector(sslClientConfigs);
selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);

// Connection should fail with endpoint validation enabled
String node3 = "3";
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
createSelector(sslClientConfigs);
selector.connect(node3, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.State.AUTHENTICATION_FAILED);
selector.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() thro
*/
@Test
public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws Exception {
testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short) 1);
testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short) 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a typo in the code, SASL_PLAINTEXT is tested a few lines above this.

}

/**
Expand Down Expand Up @@ -1466,7 +1466,7 @@ private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtoc

private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
createSelector(securityProtocol, saslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,13 @@ object ConfigCommand extends Config {
private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
val props = new Properties
if (opts.options.has(opts.addConfig)) {
//split by commas, but avoid those in [], then into KV pairs
// Split list by commas, but avoid those in [], then into KV pairs
// Each KV pair is of format key=value, split them into key and value, using -1 as the limit for split() to
// include trailing empty strings. This is to support empty value (e.g. 'ssl.endpoint.identification.algorithm=')
val pattern = "(?=[^\\]]*(?:\\[|$))"
val configsToBeAdded = opts.options.valueOf(opts.addConfig)
.split("," + pattern)
.map(_.split("""\s*=\s*""" + pattern))
.map(_.split("""\s*=\s*""" + pattern, -1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment explaining why the -1 is needed here?

require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\".")
//Create properties, parsing square brackets from values if necessary
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ object Defaults {
val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
val SslEndpointIdentificationAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
val SslClientAuthRequired = "required"
val SslClientAuthRequested = "requested"
val SslClientAuthNone = "none"
Expand Down Expand Up @@ -955,7 +956,7 @@ object KafkaConfig {
.define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, SslTruststorePasswordDoc)
.define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc)
.define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc)
.define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc)
.define(SslEndpointIdentificationAlgorithmProp, STRING, Defaults.SslEndpointIdentificationAlgorithm, LOW, SslEndpointIdentificationAlgorithmDoc)
.define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc)
.define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
.define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts)
assertEquals(1, deletedProps.size)
assertEquals("a", deletedProps.head)

createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "x",
"--entity-type", entityType,
"--alter",
"--add-config", "a=b,c=,d=e,f="))
createOpts.checkArgs()

val addedProps2 = ConfigCommand.parseConfigsToBeAdded(createOpts)
assertEquals(4, addedProps2.size())
assertEquals("b", addedProps2.getProperty("a"))
assertEquals("e", addedProps2.getProperty("d"))
assertTrue(addedProps2.getProperty("c").isEmpty)
assertTrue(addedProps2.getProperty("f").isEmpty)
}

@Test(expected = classOf[IllegalArgumentException])
Expand Down
22 changes: 20 additions & 2 deletions docs/security.html
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,29 @@ <h3><a id="security_ssl" href="#security_ssl">7.2 Encryption and Authentication
<li>validity: the valid time of the certificate in days.</li>
</ol>
<br>
Note: By default the property <code>ssl.endpoint.identification.algorithm</code> is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following property:

<h5><a id="security_confighostname" href="#security_confighostname">Configuring Host Name Verification</a></h5>
From Kafka version 2.0.0 onwards, host name verification of servers is enabled by default for client connections
as well as inter-broker connections to prevent man-in-the-middle attacks. Server host name verification may be disabled
by setting <code>ssl.endpoint.identification.algorithm</code> to an empty string. For example,
<pre class="brush: text;"> ssl.endpoint.identification.algorithm=</pre>
For dynamically configured broker listeners, hostname verification may be disabled using <code>kafka-configs.sh</code>.
For example,
<pre class="brush: text;">
bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
</pre>

For older versions of Kafka, <code>ssl.endpoint.identification.algorithm</code> is not defined by default, so host name
verification is not performed. The property should be set to <code>HTTPS</code> to enable host name verification.

<pre class="brush: text;"> ssl.endpoint.identification.algorithm=HTTPS </pre>

Once enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields:
Host name verification must be enabled to prevent man-in-the-middle attacks if server endpoints are not validated
externally.

<h5><a id="security_configcerthostname" href="#security_configcerthstname">Configuring Host Name In Certificates</a></h5>
If host name verification is enabled, clients will verify the server's fully qualified domain name (FQDN) against one of
the following two fields:
<ol>
<li>Common Name (CN)
<li>Subject Alternative Name (SAN)
Expand Down
1 change: 1 addition & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2
<ul>
<li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config <code>offsets.retention.minutes</code> to 1440.</li>
<li>Support for Java 7 has been dropped, Java 8 is now the minimum version required.</li>
<li> The default value for <code>ssl.endpoint.identification.algorithm</code> was changed to <code>https</code>, which performs hostname verification (man-in-the-middle attacks are possible otherwise). Set <code>ssl.endpoint.identification.algorithm</code> to an empty string to restore the previous behaviour. </li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends the lower interval of <code>max.connections.per.ip minimum</code> to zero and therefore allows IP-based filtering of inbound connections.</li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-272%3A+Add+API+version+tag+to+broker%27s+RequestsPerSec+metric">KIP-272</a>
added API version tag to the metric <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public KafkaEmbedded(final Properties config, final MockTime time) throws IOExce
private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException {
final Properties effectiveConfig = new Properties();
effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "localhost");
effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
Expand All @@ -100,7 +100,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) throws IO
}

/**
* This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`.
* This broker's `metadata.broker.list` value. Example: `localhost:9092`.
* <p>
* You can use this to tell Kafka producers and consumers how to connect to this instance.
*/
Expand Down