Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] How do I specify a KafkaUser that has access to all topics and is authorized to create new topics? #2141

Closed
LeonardAukea opened this issue Oct 29, 2019 · 17 comments
Labels

Comments

@LeonardAukea
Copy link

Does anyone know how I could create a 'super' KafkaUser that is allowed to create topics and can operate on all topics?

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: super-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  type: allow
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: *
          patternType: literal
        operation: All

I've been trying to get a faust worker (https://faust.readthedocs.io/en/latest/) up and running but it fails:

kafka.errors.TopicAuthorizationFailedError: [Error 29] TopicAuthorizationFailedError: Cannot create topic: faust-app-__assignor-__leader (29): Authorization failed.

According to the strimzi config the creation of topics should be allowed. But i think the name of the topic that faust want to create is not very valid due to the underscores. Either way,I'm not really sure how to go about this but I would like to able to specify a user that has access to all topics. Any help is much appreciated.

@scholzj
Copy link
Member

scholzj commented Oct 29, 2019

I will need to look at what exacty is the problem here. In the meantime, you can configure the user as an super user: https://strimzi.io/docs/latest/full.html#super_users

@scholzj
Copy link
Member

scholzj commented Oct 29, 2019

Ok, I had a look at this ... The YAML as you have it above doesn't work for me. It complains about the *. It looks like I need to do this to have it accepted:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: super-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  type: allow
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: '*'
          patternType: literal
        operation: All

But once I change that, it seems to work and I can create the topic:

$ bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap-myproject.192.168.64.126.nip.io:443 --command-config ./topics-config.properties --create --topic faust-app-__assignor-__leader --partitions 3 --replication-factor 3
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
$ bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap-myproject.192.168.64.126.nip.io:443 --command-config ./topics-config.properties --describe
Topic:faust-app-__assignor-__leader	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: faust-app-__assignor-__leader	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: faust-app-__assignor-__leader	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
	Topic: faust-app-__assignor-__leader	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1

I do not really use Faust for anything so I cannot try it with that. Maybe it triggers some other rights than the kafka-topics tool. But maybe the * is the issue?

@LeonardAukea
Copy link
Author

@scholzj Thanks for looking into this. Sorry for not being clear enough. I never used the '*' syntax. I just thought that it might be the way it should be used (Following kubernetes syntax). Thanks for pointing me to the right location in the docs. Could you please share your super user config. In the meantime, I will try following the description in the docs.

@scholzj
Copy link
Member

scholzj commented Oct 30, 2019

Well, funny enough at least for creating the topic, it actually works as you described it there with the only exception off wrapping the *. But it is not a real super user - this gives the user only all rights to topics. But not to consumer groups, cluster resources etc. So for an actual super user, you should foolow the docs I pointed out.

@LeonardAukea
Copy link
Author

hehe. Yeah I will probably create a superuser for all the faust applications just in case. Just to clarify, since I'm not sure that I follow the docs completely.

  1. Create a KafkaUser
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: super-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  type: allow
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: '*'
          patternType: literal
        operation: All
  1. Add the user to the spec
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  kafka:
    # ...
    authorization:
      type: tls
      superUsers:
        - CN=super-user
    # ...
  zookeeper:
    # ...

@scholzj
Copy link
Member

scholzj commented Oct 30, 2019

You do not have to specify the ACLs for the user when setting it as super user. I would do it like this:

  1. Set the user as super user:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  kafka:
    # ...
    authorization:
      type: simple
      superUsers:
        - CN=super-user
    # ...
  zookeeper:
    # ...
  1. Create the user with authentication only:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: super-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls

@LeonardAukea
Copy link
Author

Thanks. It worked either way :)

@scholzj
Copy link
Member

scholzj commented Oct 30, 2019

Yeah, the order doesn't matter so much. But the user needs to wait for the brokers to be updated, that is all. Can we close this issue? Or do you have something more?

@LeonardAukea
Copy link
Author

No we can close it Thank you for your help! much appreciated

@alokhom
Copy link

alokhom commented Aug 26, 2020

Am facing this error from using an external Camel Kafka app access onto Strimzi Topic running over Openshift.
TopicAuthorizationException - Not authorized to access topics: [my-topic]
The idea is to workout a User access from external app. Not specifically as a superuser.

background :-
I used the KafkaUser user.p12 keystore, user.password and ca.crt TrustStore in the App .
Client Authentication is OK but topic authorization is not working out.
i referred some ideas for Client Authentication from: #3036 (comment)

here is my Kafka config -

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  entityOperator:
    topicOperator:
      reconciliationIntervalSeconds: 90
    userOperator:
      reconciliationIntervalSeconds: 120
  kafka:
    authorization:
      superUsers:
        - CN=my-user
      type: simple
    config:
      log.message.format.version: '2.5'
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
      external:
        overrides:
          bootstrap:
            host: bootstrap.apps.kafka.cluster42.openshift-abcd.com
          brokers:
            - broker: 0
              host: broker-0.apps.kafka.cluster42.openshift-abcd.com
            - broker: 1
              host: broker-1.apps.kafka.cluster42.openshift-abcd.com
            - broker: 2
              host: broker-2.apps.kafka.cluster42.openshift-abcd.com
        type: route
      plain:
        authentiation:
          type: scram-sha-512
      tls:
        authentiation:
          type: tls
    replicas: 3
    storage:
      class: rook-ceph-block
      size: 20Gi
      type: persistent-claim
    version: 2.5.0
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral

--
Kafka Topic

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  labels:
    strimzi.io/cluster: my-cluster
  name: my-topic
  namespace: kafka
spec:
  config:
    retention.ms: 604800000
    segment.bytes: 1073741824
  partitions: 10
  replicas: 3

--
here is my KafkaUser -

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  labels:
    strimzi.io/cluster: my-cluster
  name: my-user
  namespace: kafka
spec:
  authentication:
    type: tls

@scholzj
Copy link
Member

scholzj commented Aug 26, 2020

So when you say that you have an external Apache Camel application ... that means it is running outside of OCP and connects using the routes? If that is the case the problem is that you have no authentication enabled for the external interface. So you configured CN=my-user as super user. But without enabling authentication on the listener the user will be authenticated as ANONYMOUS. You will need to add the authentication section also to the external listener:

    listeners:
      external:
        overrides:
          bootstrap:
            host: bootstrap.apps.kafka.cluster42.openshift-abcd.com
          brokers:
            - broker: 0
              host: broker-0.apps.kafka.cluster42.openshift-abcd.com
            - broker: 1
              host: broker-1.apps.kafka.cluster42.openshift-abcd.com
            - broker: 2
              host: broker-2.apps.kafka.cluster42.openshift-abcd.com
        type: route
        authentiation:
          type: tls
      plain:
        authentiation:
          type: scram-sha-512
      tls:
        authentiation:
          type: tls

@alokhom
Copy link

alokhom commented Aug 26, 2020

yes - it means it is running outside of OCP and connects using the routes
ok i do this - You will need to add the authentication section also to the external listener

heres the output -

23:27:49.537 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0.redhat-00003
23:27:49.537 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: f960e3745ec74111
23:27:49.538 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1598477269537
23:27:49.538 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.c.component.kafka.KafkaConsumer - Reconnecting my-topic-Thread 0 to topic my-topic after 5000 ms
23:27:49.902 [kafka-producer-network-thread | producer-1] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 40 : {my-topic=TOPIC_AUTHORIZATION_FAILED}
23:27:49.903 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Topic authorization failed for topics [my-topic]
23:27:49.903 [Camel (MyCamel) thread #2 - timer://foo] ERROR o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-DESKTOP-72U4DSI-1598477229628-0-76 on ExchangeId: ID-DESKTOP-72U4DSI-1598477229628-0-75). Exhausted after delivery attempt: 1 caught: org.apache.kafka.common
.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[producer-route    ] [producer-route    ] [timer://foo?period=1000                                                       ] [        68]
[producer-route    ] [route-setBody     ] [setBody[simple{Hello World from camel-context.xml with ID ${id}}]             ] [         0]
[producer-route    ] [_kafka1           ] [kafka:my-topic?securityProtocol=SSL&sslTruststoreLocation=C:/Users/F2531353.LI] [        68]


kafka pod logs

2020-08-26 21:27:08,421 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-7]
2020-08-26 21:27:08,482 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-4]
2020-08-26 21:27:13,763 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-7]
2020-08-26 21:27:13,839 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-5]
2020-08-26 21:27:19,095 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-1]
2020-08-26 21:27:19,161 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-5]
2020-08-26 21:27:24,404 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-5]
2020-08-26 21:27:24,454 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-3]
2020-08-26 21:27:29,724 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-0]
2020-08-26 21:27:29,781 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-6]
2020-08-26 21:27:35,092 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-6]
2020-08-26 21:27:35,154 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-1]
2020-08-26 21:27:40,404 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-7]
2020-08-26 21:27:40,454 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-2]
2020-08-26 21:27:45,704 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-1]
2020-08-26 21:27:45,714 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-7]
2020-08-26 21:27:51,006 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Topic:LITERAL:my-topic (kafka.authorizer.logger) [data-plane-kafka-request-handler-2]
2020-08-26 21:27:51,061 INFO Principal = User:ANONYMOUS is Denied Operation = Describe from host = 10.131.0.1 on resource = Group:LITERAL:9da3dd7b-b559-4870-89e5-f7d0cb8f3bb0 (kafka.authorizer.logger) [data-plane-kafka-request-handler-3]
2020-08-26 21:28:27,591 INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [group-metadata-manager-0]
2020-08-26 21:38:27,591 INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [group-metadata-manager-0]
2020-08-26 21:48:27,591 INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [group-metadata-manager-0]
2020-08-26 21:58:27,591 INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [group-metadata-manager-0]

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  creationTimestamp: '2020-08-25T14:38:29Z'
  generation: 9
  name: my-cluster
  namespace: kafka
  resourceVersion: '2259038'
  selfLink: /apis/kafka.strimzi.io/v1beta1/namespaces/kafka/kafkas/my-cluster
  uid: 984954e3-0c68-4ea0-a9e0-0d9e8f55f4a6
spec:
  entityOperator:
    topicOperator:
      reconciliationIntervalSeconds: 90
    userOperator:
      reconciliationIntervalSeconds: 120
  kafka:
    authorization:
      superUsers:
        - CN=my-user
      type: simple
    config:
      log.message.format.version: '2.5'
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
      external:
        authentiation:
          type: tls
        overrides:
          bootstrap:
            host: bootstrap.apps.kafka.cluster42.openshift-abcd.com
          brokers:
            - broker: 0
              host: broker-0.apps.kafka.cluster42.openshift-abcd.com
            - broker: 1
              host: broker-1.apps.kafka.cluster42.openshift-abcd.com
            - broker: 2
              host: broker-2.apps.kafka.cluster42.openshift-abcd.com
        type: route
      plain:
        authentiation:
          type: scram-sha-512
      tls:
        authentiation:
          type: tls
    replicas: 3
    storage:
      class: rook-ceph-block
      size: 20Gi
      type: persistent-claim
    version: 2.5.0
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral

Kafka Topic

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  creationTimestamp: '2020-08-25T14:41:13Z'
  generation: 1
  labels:
    strimzi.io/cluster: my-cluster
  name: my-topic
  namespace: kafka
  resourceVersion: '1468960'
  selfLink: /apis/kafka.strimzi.io/v1beta1/namespaces/kafka/kafkatopics/my-topic
  uid: 52d65111-ba4f-49ad-bb56-1db93b35d60b
spec:
  config:
    retention.ms: 604800000
    segment.bytes: 1073741824
  partitions: 10
  replicas: 3

KafkaUser

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  creationTimestamp: '2020-08-26T20:03:09Z'
  generation: 2
  labels:
    strimzi.io/cluster: my-cluster
  name: my-user
  namespace: kafka
  resourceVersion: '2266268'
  selfLink: /apis/kafka.strimzi.io/v1beta1/namespaces/kafka/kafkausers/my-user
  uid: ecab2527-000a-47cb-bae8-e24f31bd2406
spec:
  authentication:
    type: tls
status:
  conditions:
    - lastTransitionTime: '2020-08-26T20:45:53.613018Z'
      status: 'True'
      type: Ready
  observedGeneration: 2
  secret: my-user
  username: CN=my-user

Also a query:
Can you tell me why are two sections of

        authentiation:
          type: tls

there ?
one in the TLS - internal and another in the External Listeners ?
As per your notes in #3036 (comment) , the truststore with CA.crt had worked for one-way TLS (server authentication).
And thereafter, when i checked out the client Authentication with user.p12 keystore and user.password inclusions in the camel-context.xml and run
$ mvn -Drun.jvmArguments="-Dbootstrap.server=bootstrap.apps.kafka.cluster42.openshift-abcd.com:443" clean package spring-boot:run -Djavax.net.debug=ssl,handshake
why do i have to add the other TLS section for external listeners ?

here is my camel-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd        http://camel.apache.org/schema/spring       http://camel.apache.org/schema/spring/camel-spring.xsd">
    <bean class="org.apache.camel.component.kafka.KafkaComponent" id="kafka">
        <property name="brokers" value="${bootstrap.server}"/>
    </bean>
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route id="consumer-route">
            <from id="kafka-consumer" uri="kafka:my-topic?securityProtocol=SSL&amp;sslTruststoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/truststore.jks&amp;sslTruststorePassword=pack64&amp;sslKeystoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/userKS.jks&amp;sslKeystorePassword=pack64&amp;sslKeyPassword=kreXdZj2Em3i"/>
            <log id="route-log" message="consumer >>> ${body}"/>
        </route>
        <route id="producer-route">
            <from id="route-timer" uri="timer://foo?period=1000"/>
            <setBody id="route-setBody">
                <simple>Hello World from camel-context.xml with ID ${id}</simple>
            </setBody>
            <to id="_kafka1" uri="kafka:my-topic?securityProtocol=SSL&amp;sslTruststoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/truststore.jks&amp;sslTruststorePassword=pack64&amp;sslKeystoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/userKS.jks&amp;sslKeystorePassword=pack64&amp;sslKeyPassword=kreXdZj2Em3i"/>
            <log id="route-log-producer" message="producer >>> ${body}"/>
        </route>
    </camelContext>
</beans>

i used to generate my stores like this:
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath=’{.data.ca.crt}’ | base64 -d > ca.crt
keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass pack64 -noprompt
kubectl get secret my-user -o jsonpath=’{.data.user.password}’ | base64 -d > user.password
kubectl get secret my-user -o jsonpath=’{.data.user.p12}’ | base64 -d > user.p12
keytool -importkeystore -srckeystore user.p12 -srcstoretype pkcs12 -destkeystore userKS.jks -deststoretype jks
(here i used the user.password to make the userKS.jks

@scholzj
Copy link
Member

scholzj commented Aug 26, 2020

Actually, I didn't noticed it before and just copied it. But you have there authentiation instead of authentication. The authentication is configure per listener. You can have different listeners with different authentications types. So you need to define it for each of them.

@alokhom
Copy link

alokhom commented Aug 26, 2020

oh what a typo on my end !!!
it works like a charm !

00:27:12.481 [Camel (MyCamel) thread #7 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-10
00:27:12.536 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-10
00:27:13.490 [Camel (MyCamel) thread #8 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-13
00:27:13.511 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-13
00:27:14.477 [Camel (MyCamel) thread #9 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-16
00:27:14.495 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-16
00:27:15.485 [Camel (MyCamel) thread #10 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-19
00:27:15.505 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-19
00:27:16.474 [Camel (MyCamel) thread #11 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-22
00:27:16.481 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-22
00:27:17.473 [Camel (MyCamel) thread #12 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-25
00:27:17.478 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-25
00:27:18.491 [Camel (MyCamel) thread #3 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-28
00:27:18.494 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-28
00:27:19.482 [Camel (MyCamel) thread #4 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-31
00:27:19.486 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-31
00:27:20.479 [Camel (MyCamel) thread #5 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-34
00:27:20.480 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-34
00:27:21.480 [Camel (MyCamel) thread #6 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-37
00:27:21.488 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-37
00:27:22.482 [Camel (MyCamel) thread #7 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-40
00:27:22.485 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-40
00:27:23.470 [Camel (MyCamel) thread #8 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-43
00:27:23.475 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-43
00:27:24.483 [Camel (MyCamel) thread #9 - KafkaProducer[my-topic]] INFO producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-46
00:27:24.508 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598480825136-0-46

@alokhom
Copy link

alokhom commented Aug 26, 2020

Can you tell me how it works ?

  1. TLS internal listerner worked for Server Auth and External Listener had no TLS then. I had routes for external access.
  2. now with KafkaUser TLS authentication used, why is it not using the same TLS that was used for Server Authentiation ?
  • why is that i had to enable TLS also for external listener ? how does this work ? It would be awesome if you can diagramatically show this interractions here. Just a request.

@alokhom
Copy link

alokhom commented Aug 26, 2020

This time i took off the superUser from the Kafka config and applied KafkaUser ACL policies Authorization in the KafkaUser
Reference: #3036 (comment)
ACL policies applied: https://github.com/strimzi/strimzi-kafka-operator/blob/master/examples/user/kafka-user.yaml
topic : my-topic
group: my-group ( which is expected to be created by default and discovered from the ACL lists applied using KafkaACL.sh command. I didnt discover but made a guess of the groupname from the kafkaUser yaml. )

I ensured i used a groupID in the camel consumer as seen below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd        http://camel.apache.org/schema/spring       http://camel.apache.org/schema/spring/camel-spring.xsd">
    <bean class="org.apache.camel.component.kafka.KafkaComponent" id="kafka">
        <property name="brokers" value="${bootstrap.server}"/>
    </bean>
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route id="consumer-route">
            <from id="kafka-consumer" uri="kafka:my-topic?securityProtocol=SSL&amp;sslTruststoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/truststore.jks&amp;sslTruststorePassword=pack64&amp;sslKeystoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/userKS.jks&amp;sslKeystorePassword=pack64&amp;sslKeyPassword=kreXdZj2Em3i&amp;groupId=my-group"/>
            <log id="route-log" message="consumer >>> ${body}"/>
        </route>
        <route id="producer-route">
            <from id="route-timer" uri="timer://foo?period=1000"/>
            <setBody id="route-setBody">
                <simple>Hello World from camel-context.xml with ID ${id}</simple>
            </setBody>
            <to id="_kafka1" uri="kafka:my-topic?securityProtocol=SSL&amp;sslTruststoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/truststore.jks&amp;sslTruststorePassword=pack64&amp;sslKeystoreLocation=C:/Users/F2531353.LIK/Desktop/amq-examples/camel-kafka-demo/src/main/resources/userKS.jks&amp;sslKeystorePassword=pack64&amp;sslKeyPassword=kreXdZj2Em3i"/>
            <log id="route-log-producer" message="producer >>> ${body}"/>
        </route>
    </camelContext>
</beans>

And it worked like a charm.

01:34:29.434 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1228
01:34:30.420 [Camel (MyCamel) thread #4 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1231
01:34:30.420 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1231
01:34:31.435 [Camel (MyCamel) thread #5 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1234
01:34:31.436 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1234
01:34:32.441 [Camel (MyCamel) thread #6 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1237
01:34:32.443 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1237
01:34:33.430 [Camel (MyCamel) thread #7 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1240
01:34:33.432 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1240
01:34:34.443 [Camel (MyCamel) thread #8 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1243
01:34:34.445 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1243
01:34:35.462 [Camel (MyCamel) thread #9 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1246
01:34:35.510 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1246
01:34:36.430 [Camel (MyCamel) thread #10 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1249
01:34:36.435 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1249
01:34:37.443 [Camel (MyCamel) thread #11 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1252
01:34:37.444 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1252
01:34:38.430 [Camel (MyCamel) thread #12 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1255
01:34:38.431 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1255
01:34:39.431 [Camel (MyCamel) thread #3 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1258
01:34:39.433 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1258
01:34:40.428 [Camel (MyCamel) thread #4 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1261
01:34:40.430 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1261
01:34:41.442 [Camel (MyCamel) thread #5 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1264
01:34:41.443 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-DESKTOP-72U4DSI-1598484455486-0-1264

Must thank for some real good user usecases. !

@scholzj
Copy link
Member

scholzj commented Aug 27, 2020

Can you tell me how it works ?

  1. TLS internal listerner worked for Server Auth and External Listener had no TLS then. I had routes for external access.
  2. now with KafkaUser TLS authentication used, why is it not using the same TLS that was used for Server Authentiation ?
  • why is that i had to enable TLS also for external listener ? how does this work ? It would be awesome if you can diagramatically show this interractions here. Just a request.

The TLS server authentication is the same as you do for example when you visit a website with HTTPS. it means that your client will verify the identity of the server, but the server (Kafka broker in this case) will not care about the identity of the client. So only when you enable the TLS client authentication (that is the authentication: ... section) the broker will enforce the client authentication and ask the client to present a certificate as well. So your external listener is TLS by default, but only the server authentication and encryption part. Not the TLS client authentication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants