KAFKA-6750: Add listener name to authentication context (KIP-282)#4829
KAFKA-6750: Add listener name to authentication context (KIP-282)#4829ijuma merged 6 commits intoapache:trunkfrom
Conversation
cabe8e1 to
ed2d367
Compare
…text Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com> Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
ed2d367 to
9c519d7
Compare
|
Thanks for the PR. FYI, the text in italics in the PR is meant to be replaced with your own message. ;) |
|
Thanks @ijuma I updated the description accordingly 😄 |
| import org.junit.Assert._ | ||
| import org.junit.{Before, Test} | ||
|
|
||
| class ListenerAndPrincipalBuilderTest extends BaseRequestTest { |
There was a problem hiding this comment.
Do we need a new test for this? These tests are expensive since they involve starting several brokers. Ideally, we'd update one of the existing tests to also check the listener name.
There was a problem hiding this comment.
We had a look at other tests extending BaseRequestTest and this does not really fit anywhere. We tried merging it into KafkaMetricReporterExceptionHandlingTest (one test checking both logics) but it really looked awful. As each test needs its own setup, the overall logic made little sense even with comments.
Yes the test takes 5 secs but we think it's clearer to keep it separated.
There was a problem hiding this comment.
FYI we pushed the experiment into a branch: https://github.com/mimaison/kafka/blob/listener-name-with-miscellaneous-test/core/src/test/scala/unit/kafka/server/KafkaMiscellaneousRequestTest.scala
There was a problem hiding this comment.
@ijuma As the deadline is today, can you have a look?
There was a problem hiding this comment.
What about PlaintextEndToEndAuthorizationTest?
|
This is a small feature, so we have an extra week. |
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR, left a few comments.
| public class PlaintextChannelBuilder implements ChannelBuilder { | ||
| private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class); | ||
| private Map<String, ?> configs; | ||
| private ListenerName listenerName; |
| public class ChannelBuildersTest { | ||
|
|
||
| @Test | ||
| @SuppressWarnings("deprecation") |
There was a problem hiding this comment.
This test does not use any deprecated classes, so it's unecessary
There was a problem hiding this comment.
I see, OldPrincipalBuilder extends the deprecated class but it's not deprecated itself.
| } catch (KafkaException e) { | ||
| // Expected exception | ||
| } finally { | ||
| channelBuilder.close(); |
| import org.junit.Assert._ | ||
| import org.junit.{Before, Test} | ||
|
|
||
| class ListenerAndPrincipalBuilderTest extends BaseRequestTest { |
There was a problem hiding this comment.
What about PlaintextEndToEndAuthorizationTest?
| InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress(); | ||
| return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress)); | ||
| // listenerName should only be null in Client mode | ||
| String listenerNameStr = listenerName != null ? listenerName.value() : null; |
There was a problem hiding this comment.
This seems a bit brittle. Maybe the field should be Optional<ListenerName>?
There was a problem hiding this comment.
We should never be calling principal() in client mode.
There was a problem hiding this comment.
We should probably not have the null check then, better fail fast. Same for the other case.
| /** | ||
| * Name of the listener used for the connection | ||
| */ | ||
| String listenerName(); |
There was a problem hiding this comment.
Can this be null? Maybe we should return Optional<String>. We would need to update the KIP too in that case.
There was a problem hiding this comment.
AuthenticationContext is only used on the server-side, so listener name should never be null.
|
Regarding The We don't have to check for In my opinion, having an |
|
@mimaison, // listenerName should only be null in Client mode
+ String listenerNameStr = listenerName != null ? listenerName.value() : null;Another option would be to have a Am I missing something? Also cc @rajinisivaram in case she has some thoughts. |
|
@ijuma There are a few internal methods where |
|
Synced with @rajinisivaram offline and |
ijuma
left a comment
There was a problem hiding this comment.
A few minor comments and then I think this is ready to be merged.
| public class PlaintextChannelBuilder implements ChannelBuilder { | ||
| private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class); | ||
| private Map<String, ?> configs; | ||
| private final ListenerName listenerName; |
There was a problem hiding this comment.
static fields first, then final, then mutable.
|
|
||
| /** | ||
| * Constructs a plaintext channel builder. ListenerName is provided only | ||
| * for server channel builder and will be null for client channel builder. |
There was a problem hiding this comment.
It's actually a bit more complicated. It's non-null whenever it's instantiated in the broker and null otherwise. In the broker case, we create both server and client channel builders, but it's set in both (for sending and receiving requests respectively).
| InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress(); | ||
| return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress)); | ||
| // listenerName should only be null in Client mode | ||
| String listenerNameStr = listenerName != null ? listenerName.value() : null; |
There was a problem hiding this comment.
We should probably not have the null check then, better fail fast. Same for the other case.
| * An object representing contextual information from the authentication session. See | ||
| * {@link SaslAuthenticationContext} and {@link SslAuthenticationContext}. | ||
| * {@link PlaintextAuthenticationContext}, {@link SaslAuthenticationContext} | ||
| * and {@link SslAuthenticationContext}. |
There was a problem hiding this comment.
Maybe we should mention that this class is only used in the broker.
| package org.apache.kafka.common.security.auth; | ||
|
|
||
| import javax.net.ssl.SSLSession; | ||
|
|
| transportLayer, oldPrincipalBuilder, null); | ||
|
|
||
| KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())); | ||
| KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); |
There was a problem hiding this comment.
A few long lines in this class. The goal is that they should fit within the GitHub window.
| } | ||
|
|
||
| private def sendRecords(numRecords: Int, tp: TopicPartition) { | ||
| protected def sendRecords(numRecords: Int, tp: TopicPartition) { |
| // which we derive corresponding principals | ||
| object PlaintextEndToEndAuthorizationTest { | ||
| var clientListenerName: String = "" | ||
| var serverListenerName: String = "" |
There was a problem hiding this comment.
private and volatile? Also, it seems like initialising them as None would be better.
| fail("Should have thrown a TopicAuthorizationException") | ||
| } catch { | ||
| case tae: TopicAuthorizationException => //expected | ||
| } |
There was a problem hiding this comment.
Use intercept[TopicAuthorizationException](sendRecords(1, tp)
|
@ijuma Thanks for the feedback! I've addressed all your comments. |
| intercept[TopicAuthorizationException](sendRecords(1, tp)) | ||
|
|
||
| assertEquals("CLIENT", PlaintextEndToEndAuthorizationTest.clientListenerName.get) | ||
| assertEquals("SERVER", PlaintextEndToEndAuthorizationTest.serverListenerName.get) |
There was a problem hiding this comment.
The right way to do this is assertEquals(Some("SERVER"), PlaintextEndToEndAuthorizationTest.serverListenerName) so that one gets a reasonable error if the Option is None.
…grained-acl-create-topics * apache-github/trunk: KAFKA-5588: Remove deprecated --new-consumer tools option (apache#5097) MINOR: Fix for the location of the trogdor.sh executable file in the documentation. (apache#5040) KAFKA-6997: Exclude test-sources.jar when $INCLUDE_TEST_JARS is FALSE MINOR: docs should point to latest version (apache#5132) KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes (KIP-298) [KAFKA-6730] Simplify State Store Recovery (apache#5013) MINOR: Rename package `internal` to `internals` for consistency (apache#5137) KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (apache#4801) MINOR: Add missing configs for resilience settings MINOR: Add regression tests for KTable mapValues and filter (apache#5134) KAFKA-6750: Add listener name to authentication context (KIP-282) (apache#4829) KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (apache#4956) KAFKA-6938: Add documentation for accessing Headers on Kafka Streams Processor API (apache#5128) KAFKA-6813: return to double-counting for count topology names (apache#5075) KAFKA-5919; Adding checks on "version" field for tools using it MINOR: Remove deprecated KafkaStreams constructors in docs (apache#5118)
…refix * apache-github/trunk: KAFKA-6726: Fine Grained ACL for CreateTopics (KIP-277) (apache#4795) KAFKA-5588: Remove deprecated --new-consumer tools option (apache#5097) MINOR: Fix for the location of the trogdor.sh executable file in the documentation. (apache#5040) KAFKA-6997: Exclude test-sources.jar when $INCLUDE_TEST_JARS is FALSE MINOR: docs should point to latest version (apache#5132) KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes (KIP-298) [KAFKA-6730] Simplify State Store Recovery (apache#5013) MINOR: Rename package `internal` to `internals` for consistency (apache#5137) KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (apache#4801) MINOR: Add missing configs for resilience settings MINOR: Add regression tests for KTable mapValues and filter (apache#5134) KAFKA-6750: Add listener name to authentication context (KIP-282) (apache#4829) KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (apache#4956) KAFKA-6938: Add documentation for accessing Headers on Kafka Streams Processor API (apache#5128) KAFKA-6813: return to double-counting for count topology names (apache#5075) KAFKA-5919; Adding checks on "version" field for tools using it MINOR: Remove deprecated KafkaStreams constructors in docs (apache#5118)
…ache#4829) PrincipalBuilder implementations can now take the listener into account when creating the Principal. This is especially interesting in deployments where inter-broker traffic is on a different listener than client traffic or when the same protocol is used by multiple listeners. The change in itself is mostly "plumbing" as the listener name needs to be passed from ChannelBuilders all the way down to all classes implementing AuthenticationContext. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk> Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com> Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar ecomar@uk.ibm.com
Co-authored-by: Mickael Maison mickael.maison@gmail.com
This KIP adds the String value of the ListenerName used for the connection to the AuthenticationContext.
This allows PrincipalBuilders to retrieve this value and build different Principal based on the source of the connection. This is especially interesting in deployments where inter-broker traffic is on a different network/listener than user traffic or when the same protocol is used by several listeners.
The change in itself is mostly "plumbing" as the listener name needs to be passed from
ChannelBuildersall the way down to all classes implementingAuthenticationContext.Committer Checklist (excluded from commit message)