Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
tokenCache);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder();
channelBuilder = new PlaintextChannelBuilder(listenerName);
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,17 @@

public class PlaintextChannelBuilder implements ChannelBuilder {
private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
private final ListenerName listenerName;
private Map<String, ?> configs;

/**
* Constructs a plaintext channel builder. ListenerName is non-null whenever
* it's instantiated in the broker and null otherwise.
*/
public PlaintextChannelBuilder(ListenerName listenerName) {
this.listenerName = listenerName;
}

public void configure(Map<String, ?> configs) throws KafkaException {
this.configs = configs;
}
Expand All @@ -43,7 +52,7 @@ public void configure(Map<String, ?> configs) throws KafkaException {
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer);
PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer, listenerName);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
Expand All @@ -58,10 +67,12 @@ public void close() {}
private static class PlaintextAuthenticator implements Authenticator {
private final PlaintextTransportLayer transportLayer;
private final KafkaPrincipalBuilder principalBuilder;
private final ListenerName listenerName;

private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer) {
private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer, ListenerName listenerName) {
this.transportLayer = transportLayer;
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
this.listenerName = listenerName;
}

@Override
Expand All @@ -70,7 +81,10 @@ public void authenticate() throws IOException {}
@Override
public KafkaPrincipal principal() {
InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress();
return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress));
// listenerName should only be null in Client mode where principal() should not be called
if (listenerName == null)
throw new IllegalStateException("Unexpected call to principal() when listenerName is null");
return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress, listenerName.value()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ListenerName listenerName() {
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
Authenticator authenticator = new SslAuthenticator(configs, transportLayer);
Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
Expand Down Expand Up @@ -152,10 +152,12 @@ private String peerHost(SelectionKey key) {
private static class SslAuthenticator implements Authenticator {
private final SslTransportLayer transportLayer;
private final KafkaPrincipalBuilder principalBuilder;
private final ListenerName listenerName;

private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer) {
private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer, ListenerName listenerName) {
this.transportLayer = transportLayer;
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
this.listenerName = listenerName;
}
/**
* No-Op for plaintext authenticator
Expand All @@ -170,7 +172,13 @@ public void authenticate() throws IOException {}
@Override
public KafkaPrincipal principal() {
InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress();
SslAuthenticationContext context = new SslAuthenticationContext(transportLayer.sslSession(), clientAddress);
// listenerName should only be null in Client mode where principal() should not be called
if (listenerName == null)
throw new IllegalStateException("Unexpected call to principal() when listenerName is null");
SslAuthenticationContext context = new SslAuthenticationContext(
transportLayer.sslSession(),
clientAddress,
listenerName.value());
return principalBuilder.build(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import java.net.InetAddress;


/**
* An object representing contextual information from the authentication session. See
* {@link SaslAuthenticationContext} and {@link SslAuthenticationContext}.
* {@link PlaintextAuthenticationContext}, {@link SaslAuthenticationContext}
* and {@link SslAuthenticationContext}. This class is only used in the broker.
*/
public interface AuthenticationContext {
/**
Expand All @@ -32,4 +34,9 @@ public interface AuthenticationContext {
* Address of the authenticated client
*/
InetAddress clientAddress();

/**
* Name of the listener used for the connection
*/
String listenerName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be null? Maybe we should return Optional<String>. We would need to update the KIP too in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AuthenticationContext is only used on the server-side, so listener name should never be null.

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

public class PlaintextAuthenticationContext implements AuthenticationContext {
private final InetAddress clientAddress;
private final String listenerName;

public PlaintextAuthenticationContext(InetAddress clientAddress) {
public PlaintextAuthenticationContext(InetAddress clientAddress, String listenerName) {
this.clientAddress = clientAddress;
this.listenerName = listenerName;
}

@Override
Expand All @@ -35,4 +37,9 @@ public InetAddress clientAddress() {
return clientAddress;
}

@Override
public String listenerName() {
return listenerName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
package org.apache.kafka.common.security.auth;

import javax.security.sasl.SaslServer;

import java.net.InetAddress;

public class SaslAuthenticationContext implements AuthenticationContext {
private final SaslServer server;
private final SecurityProtocol securityProtocol;
private final InetAddress clientAddress;
private final String listenerName;

public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol, InetAddress clientAddress) {
public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol, InetAddress clientAddress, String listenerName) {
this.server = server;
this.securityProtocol = securityProtocol;
this.clientAddress = clientAddress;
this.listenerName = listenerName;
}

public SaslServer server() {
Expand All @@ -43,4 +46,9 @@ public SecurityProtocol securityProtocol() {
public InetAddress clientAddress() {
return clientAddress;
}

@Override
public String listenerName() {
return listenerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
public class SslAuthenticationContext implements AuthenticationContext {
private final SSLSession session;
private final InetAddress clientAddress;
private final String listenerName;

public SslAuthenticationContext(SSLSession session, InetAddress clientAddress) {
public SslAuthenticationContext(SSLSession session, InetAddress clientAddress, String listenerName) {
this.session = session;
this.clientAddress = clientAddress;
this.listenerName = listenerName;
}

public SSLSession session() {
Expand All @@ -41,4 +43,9 @@ public SecurityProtocol securityProtocol() {
public InetAddress clientAddress() {
return clientAddress;
}

@Override
public String listenerName() {
return listenerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void authenticate() throws IOException {

@Override
public KafkaPrincipal principal() {
SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress());
SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress(), listenerName.value());
KafkaPrincipal principal = principalBuilder.build(context);
if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
principal.tokenAuthenticated(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.easymock.EasyMock;
import org.junit.Test;

Expand All @@ -38,7 +39,6 @@
public class ChannelBuildersTest {

@Test
@SuppressWarnings("deprecation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not use any deprecated classes, so it's unecessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, OldPrincipalBuilder extends the deprecated class but it's not deprecated itself.

public void testCreateOldPrincipalBuilder() throws Exception {
TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
Authenticator authenticator = EasyMock.mock(Authenticator.class);
Expand All @@ -51,7 +51,7 @@ public void testCreateOldPrincipalBuilder() throws Exception {
assertTrue(OldPrincipalBuilder.configured);

// test delegation
KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
assertEquals(OldPrincipalBuilder.PRINCIPAL_NAME, principal.getName());
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void setUp() throws Exception {
this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs);
this.server.start();
this.time = new MockTime();
this.channelBuilder = new PlaintextChannelBuilder();
this.channelBuilder = new PlaintextChannelBuilder(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
this.channelBuilder.configure(configs);
this.metrics = new Metrics();
this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder, new LogContext());
Expand Down Expand Up @@ -305,7 +305,7 @@ public void testMute() throws Exception {

@Test
public void registerFailure() throws Exception {
ChannelBuilder channelBuilder = new PlaintextChannelBuilder() {
ChannelBuilder channelBuilder = new PlaintextChannelBuilder(null) {
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool) throws KafkaException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ public void testClientAuthenticationRequestedNotProvided() throws Exception {
*/
@Test
public void testInvalidSecureRandomImplementation() throws Exception {
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
try {
try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false)) {
sslClientConfigs.put(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, "invalid");
channelBuilder.configure(sslClientConfigs);
fail("SSL channel configured with invalid SecureRandom implementation");
Expand All @@ -444,8 +443,7 @@ public void testInvalidSecureRandomImplementation() throws Exception {
*/
@Test
public void testInvalidTruststorePassword() throws Exception {
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
try {
try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false)) {
sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
channelBuilder.configure(sslClientConfigs);
fail("SSL channel configured with invalid truststore password");
Expand All @@ -459,8 +457,7 @@ public void testInvalidTruststorePassword() throws Exception {
*/
@Test
public void testInvalidKeystorePassword() throws Exception {
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
try {
try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false)) {
sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
channelBuilder.configure(sslClientConfigs);
fail("SSL channel configured with invalid keystore password");
Expand Down Expand Up @@ -767,7 +764,7 @@ public void testCloseSsl() throws Exception {

@Test
public void testClosePlaintext() throws Exception {
testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder());
testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder(null));
}

private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,21 @@ public void testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception
DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
transportLayer, oldPrincipalBuilder, null);

KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(
InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals("foo", principal.getName());

builder.close();

verifyAll();
}

@Test
public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())));
assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())));
builder.close();
}

@Test
Expand All @@ -86,12 +88,12 @@ public void testUseOldPrincipalBuilderForSslIfProvided() throws Exception {
DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
transportLayer, oldPrincipalBuilder, null);

KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost()));
KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals("foo", principal.getName());

builder.close();

verifyAll();
}

Expand All @@ -105,10 +107,12 @@ public void testUseSessionPeerPrincipalForSsl() throws Exception {

DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);

KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost()));
KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals("foo", principal.getName());

builder.close();
verifyAll();
}

Expand All @@ -124,10 +128,11 @@ public void testPrincipalBuilderScram() throws Exception {
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);

KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost()));
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals("foo", principal.getName());

builder.close();
verifyAll();
}

Expand All @@ -146,10 +151,11 @@ public void testPrincipalBuilderGssapi() throws Exception {
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);

KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost()));
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals("foo", principal.getName());

builder.close();
verifyAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
}

private def sendRecords(numRecords: Int, tp: TopicPartition) {
protected final def sendRecords(numRecords: Int, tp: TopicPartition) {
val futures = (0 until numRecords).map { i =>
val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
debug(s"Sending this record: $record")
Expand All @@ -344,7 +344,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
}

protected def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
protected final def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,
topic: String = topic,
Expand Down
Loading