Skip to content

Minio Kafka Bucket Notifications (with TLS)

Allan Roger Reid edited this page Dec 4, 2024 · 2 revisions

https://kifarunix.com/install-apache-kafka-on-debian https://kifarunix.com/configure-apache-kafka-ssl-tls-encryption

Setup Kafka Box (allanroger.ddns.net)

Get Java

sudo apt update
sudo apt install default-jre -y
java -version

Output

openjdk version "17.0.8.1" 2023-08-24
OpenJDK Runtime Environment (build 17.0.8.1+1-Ubuntu-0ubuntu123.04)
OpenJDK 64-Bit Server VM (build 17.0.8.1+1-Ubuntu-0ubuntu123.04, mixed mode, sharing)

Get Kafka

wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
sudo mkdir /opt/kafka
sudo tar xzf kafka_2.13-3.6.0.tgz -C /opt/kafka --strip-components=1
ls -1 /opt/kafka

Output

bin
config
libs
LICENSE
licenses
NOTICE
site-docs

Running Kafka with Kafta Raft (KRaft)

ls -1 /opt/kafka/config/kraft/

Output

broker.properties
controller.properties
server.properties

Update Logs Directory

sudo mkdir /opt/kafka/logs
sudo sed -i '/^log.dirs/s|/tmp/kraft-combined-logs|/opt/kafka/logs|' /opt/kafka/config/kraft/server.properties

Generate Kafka Cluster ID

/opt/kafka/bin/kafka-storage.sh random-uuid

Output

hRieOfgJTKu-a1XCKMuklQ

Format Kafka Logs Directory to KRaft Format

sudo /opt/kafka/bin/kafka-storage.sh format -t hRieOfgJTKu-a1XCKMuklQ -c /opt/kafka/config/kraft/server.properties

Output

Formatting /opt/kafka/logs with metadata.version 3.6-IV2.

ls -1 /opt/kafka/logs/ Output

bootstrap.checkpoint
meta.properties

Configure Kafka Heap Size

grep KAFKA_HEAP_OPTS= /opt/kafka/bin/kafka-server-start.sh

Output

    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

Generate SSL/TLS Certificates

sudo mkdir /etc/ssl/kafka
sudo openssl genpkey -algorithm RSA -out /etc/ssl/kafka/ca.key
.+.....+...+....+...+.....+.+.....+......+.+........+.........+...+...................+.....+....+...........+....+..+.......+..+...+.+.....+.+...........+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*..+...+...+.+...........+..........+...+......+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*.+...+.......+..+.+........+....+...+...+...........+.....................+....+.....+....+......+......+..+..........+.....+.......+...+......+.........+.....+...+.........+.........+.+.........+..+............+.......+......+.....+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
.....+.+........+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*.+...+.....+.+.....+......+.........+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*.......+...+..+.+........+......+...............+...+...+....+...+.....+.+......+........+...+...+......+...+......+.........+............+...+......+.+.....+.+.....+.......+...+...+..+.........+...+.+.....+............+.............+...........+.+...+..+....+...+........+...+....+......+...........+..........+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
allan@wingzero:~/kafka$ 

Generate CA self-signed certificate

sudo openssl req -x509 -new -key /etc/ssl/kafka/ca.key -days 3650 -out /etc/ssl/kafka/ca.crt

Output

You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:US
State or Province Name (full name) [Some-State]:California
Locality Name (eg, city) []:San Francisco
Organization Name (eg, company): MinIO, Inc
Organizational Unit Name (eg, section) []:Engineering
Common Name (e.g. server FQDN or YOUR name) []:allanroger.ddns.net
Email Address []:

OR

sudo openssl req -x509 -new -key /etc/ssl/kafka/ca.key -days 3560 -out /etc/ssl/kafka/ca.crt \
-subj "/C=US/ST=California/L=San Francisco/O=MinIO, Inc/CN=allanroger.ddns.net/emailAddress=allan@min.io"

Generate Server Private Key and CSR

sudo openssl req -new -newkey rsa:4096 -nodes -keyout /etc/ssl/kafka/server.key \
-out /etc/ssl/kafka/server.csr \
-subj "/C=US/ST=California/L=San Francisco/O=MinIO, Inc/CN=allanroger.ddns.net/emailAddress=allan@min.io"
....+.+...+...+..+.......+.....+.+..+...+............+.......+...........+...+.......+..+.+..+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*............+...........+.+..+...............+.......+...........+....+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*........+..+....+..+............+..........+.....+.....................+....+...........+.........+...+..........+..............+.........+....+..+....+.........+..+...+.............+..+.....................+.+.....+...+.............+............+..+.+.....+...+.........+.......+........................+..+...+.+....................+...+......+......+..........+.....+.+............+............+...+......+..+...+....+...+..............+................+...+......+...+..+...................+...........+.+.....+............+......+..........+...........+...............+....+.....+......+...+.......+.....+.+............+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
.......+.........+...+...+.+...+........+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*.+.......+.....+......+.+.....+.+...+..+...+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*...+........+...............................+...........+......+.......+.....+...+.......+......+.........+...........+.+.........+......+...........+.........+...............+.+.........+........+.......+...+...+..+.+..............+......+......+.+.....+.......+..+.+............+..+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-----

Generate and Sign Server Certificate

sudo vi /etc/ssl/kafka/san.cnf
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
subjectAltName = @alt_names

[alt_names]
DNS.1=allanroger.ddns.net
DNS.2=*.allanroger.ddns.net
sudo openssl x509 -req -in /etc/ssl/kafka/server.csr -CA /etc/ssl/kafka/ca.crt \
-CAkey /etc/ssl/kafka/ca.key -CAcreateserial -out /etc/ssl/kafka/server.crt \
-days 3650 -extfile /etc/ssl/kafka/san.cnf

Output

Certificate request self-signature ok
subject=C = US, ST = California, L = San Francisco, O = "MinIO, Inc", CN = allanroger.ddns.net, emailAddress = allan@min.io

Create Kafka Keystore

Convert Server Certificate to PKCS12

sudo openssl pkcs12 -export \
	-in /etc/ssl/kafka/server.crt \
	-inkey /etc/ssl/kafka/server.key \
	-name kafka-broker \
	-out /etc/ssl/kafka/kafka.p12

Output

Enter Export Password: minioadmin
Verifying - Enter Export Password: minioadmin

Create Kafka Java KeyStore (JKS)

sudo keytool -importkeystore \
	-srckeystore /etc/ssl/kafka/kafka.p12 \
	-destkeystore /etc/ssl/kafka/kafka.keystore.jks \
	-srcstoretype pkcs12

Output

Importing keystore /etc/ssl/kafka/kafka.p12 to /etc/ssl/kafka/kafka.keystore.jks...
Enter destination keystore password:  minioadmin
Re-enter new password: minioadmin
Enter source keystore password:  minioadmin
Entry for alias kafka-broker successfully imported.
Import command completed:  1 entries successfully imported, 0 entries failed or cancelled

Create Kafka TrustStore

sudo keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file /etc/ssl/kafka/ca.crt

Output

Enter keystore password:  minioadmin
Re-enter new password: minioadmin
Owner: EMAILADDRESS=allan@min.io, CN=allanroger.ddns.net, O="MinIO, Inc", L=San Francisco, ST=California, C=US
Issuer: EMAILADDRESS=allan@min.io, CN=allanroger.ddns.net, O="MinIO, Inc", L=San Francisco, ST=California, C=US
Serial number: 4400ad989cf74dd6beff84226a5b35bbdd854135
Valid from: Wed Nov 08 21:51:14 UTC 2023 until: Sun Aug 07 21:51:14 UTC 2033
Certificate fingerprints:
	 SHA1: 26:51:CF:08:91:D3:1D:41:A0:83:16:08:65:73:22:01:E2:B5:EA:46
	 SHA256: C9:E8:5B:0C:EB:A2:42:64:C7:A9:FB:70:72:F2:54:35:E4:27:96:A2:7C:B0:40:B1:50:A9:9B:88:BE:67:2B:85
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 85 DC 2C DF 31 7D EA 07   15 C9 70 5D C3 FB 2A 7A  ..,.1.....p]..*z
0010: D9 DF 3B 98                                        ..;.
]
]

#2: ObjectId: 2.5.29.19 Criticality=true
BasicConstraints:[
  CA:true
  PathLen:2147483647
]

#3: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 85 DC 2C DF 31 7D EA 07   15 C9 70 5D C3 FB 2A 7A  ..,.1.....p]..*z
0010: D9 DF 3B 98                                        ..;.
]
]

Trust this certificate? [no]:  yes
Certificate was added to keystore

Confirm your keystore/trustore details

sudo cp kafka.server.truststore.jks /etc/ssl/kafka
keytool -list -v -keystore /etc/ssl/kafka/kafka.keystore.jks

Output

Enter keystore password:  
Keystore type: PKCS12
Keystore provider: SUN

Your keystore contains 1 entry

Alias name: kafka-broker
Creation date: Nov 8, 2023
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: EMAILADDRESS=allan@min.io, CN=allanroger.ddns.net, O="MinIO, Inc", L=San Francisco, ST=California, C=US
Issuer: EMAILADDRESS=allan@min.io, CN=allanroger.ddns.net, O="MinIO, Inc", L=San Francisco, ST=California, C=US
Serial number: 66c63ecb1864fd87707d83de882a6c490d0c3630
Valid from: Wed Nov 08 21:52:41 UTC 2023 until: Sat Nov 05 21:52:41 UTC 2033
Certificate fingerprints:
	 SHA1: 07:9F:6E:02:C3:BF:ED:9F:90:D8:38:FD:F4:BC:1D:D9:AC:40:75:36
	 SHA256: DA:84:06:F5:9B:A1:8F:A7:32:E2:91:E5:4F:89:65:1C:83:28:2D:94:84:E4:22:BE:39:BF:41:01:64:19:FA:D3
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 4096-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 85 DC 2C DF 31 7D EA 07   15 C9 70 5D C3 FB 2A 7A  ..,.1.....p]..*z
0010: D9 DF 3B 98                                        ..;.
]
]

#2: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
  CA:false
  PathLen: undefined
]

#3: ObjectId: 2.5.29.15 Criticality=false
KeyUsage [
  DigitalSignature
  Non_repudiation
  Key_Encipherment
  Data_Encipherment
]

#4: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
  DNSName: allanroger.ddns.net
  DNSName: *.allanroger.ddns.net
]

#5: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 8E D3 6D 54 DF 91 32 CB   86 55 69 74 2B C7 F5 C7  ..mT..2..Uit+...
0010: 89 08 F9 64                                        ...d
]
]



*******************************************
*******************************************

Confirm your keystore/trustore details

keytool -list -v -keystore kafka.server.truststore.jks

Output

Enter keystore password:  
Keystore type: PKCS12
Keystore provider: SUN

Your keystore contains 1 entry

Alias name: caroot
Creation date: Nov 8, 2023
Entry type: trustedCertEntry

Owner: EMAILADDRESS=allan@min.io, CN=allanroger.ddns.net, O="MinIO, Inc", L=San Francisco, ST=California, C=US
Issuer: EMAILADDRESS=allan@min.io, CN=allanroger.ddns.net, O="MinIO, Inc", L=San Francisco, ST=California, C=US
Serial number: 4400ad989cf74dd6beff84226a5b35bbdd854135
Valid from: Wed Nov 08 21:51:14 UTC 2023 until: Sun Aug 07 21:51:14 UTC 2033
Certificate fingerprints:
	 SHA1: 26:51:CF:08:91:D3:1D:41:A0:83:16:08:65:73:22:01:E2:B5:EA:46
	 SHA256: C9:E8:5B:0C:EB:A2:42:64:C7:A9:FB:70:72:F2:54:35:E4:27:96:A2:7C:B0:40:B1:50:A9:9B:88:BE:67:2B:85
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 85 DC 2C DF 31 7D EA 07   15 C9 70 5D C3 FB 2A 7A  ..,.1.....p]..*z
0010: D9 DF 3B 98                                        ..;.
]
]

#2: ObjectId: 2.5.29.19 Criticality=true
BasicConstraints:[
  CA:true
  PathLen:2147483647
]

#3: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 85 DC 2C DF 31 7D EA 07   15 C9 70 5D C3 FB 2A 7A  ..,.1.....p]..*z
0010: D9 DF 3B 98                                        ..;.
]
]



*******************************************
*******************************************

Configure Apache Kafka SSL/TLS Encryption

sudo vi /opt/kafka/config/kraft/server.properties

Set

############################# Socket Server Settings #############################

listeners=SSL://0.0.0.0:19092,CONTROLLER://0.0.0.0:19093
inter.broker.listener.name=SSL
advertised.listeners=SSL://allanroger.ddns.net:19092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL,SSL:SSL
controller.quorum.voters=1@allanroger.ddns.net:19093


ssl.keystore.location=/etc/ssl/kafka/kafka.keystore.jks
ssl.keystore.password=minioadmin
ssl.key.password=minioadmin
ssl.truststore.location=/etc/ssl/kafka/kafka.server.truststore.jks
ssl.truststore.password=minioadmin
ssl.client.auth=required

Running Kafka Broker as systemd Service

sudo vi /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka
Requires=network.target
After=network.target

[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=default.target

Make machine know itself

sudo vi /etc/hosts

Add under 127.0.0.1

allanroger.ddns.net

Reload systemd

sudo systemctl daemon-reload

Test and Validate Kafka SSL/TLS Connection

sudo systemctl restart kafka
sudo systemctl status kafka
journalctl -f -u kafka

Enable Kafka on reboot

sudo systemctl enable kafka

Test Client Topic Creation Over SSL/TLS

In new terminal

cd ~/kafka && vi kafka-client-ssl-test.properties
security.protocol=SSL
ssl.keystore.location=/etc/ssl/kafka/kafka.keystore.jks
ssl.keystore.password=minioadmin
ssl.truststore.location=/etc/ssl/kafka/kafka.server.truststore.jks
ssl.truststore.password=minioadmin

Test the SSL/TLS connection to Kafka

/opt/kafka/bin/kafka-topics.sh --create \
	--topic testssl-topic \
	--bootstrap-server allanroger.ddns.net:19092  \
	--command-config kafka-client-ssl-test.properties

Output

Created topic testssl-topic.

List the topics;

/opt/kafka/bin/kafka-topics.sh --list \
	--bootstrap-server allanroger.ddns.net:19092  \
	--command-config kafka-client-ssl-test.properties

Output

testssl-topic

Note: may need to add 8.8.8.8 entry to resolve.conf i.e. nameserver 8.8.8.8

sudo vi /etc/resolv.conf 

On minio box (kafka-minio.lab.min.dev), install mc and configure kafka

ssh -p 20058 ubuntu@1.2.3.4 -o "ServerAliveInterval=5" -o "ServerAliveCountMax=100000" -o "StrictHostKeyChecking=off"
mkdir ~/mc && cd ~/mc && wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
./mc alias set minio-kafka https://kafka-demo.lab.min.dev:9000 minioadmin minioadmin

Add the Kafka Endpoint to MinIO. Note: use server.crt and server.key created on kafka instance since these are known to the kafka keystore. Another identity could be constructed and added to the keystore but this is enough to demo.

./mc admin config set minio-kafka/ notify_kafka:TLS \
   brokers="allanroger.ddns.net:19092" \
   topic="testssl-topic" \
   tls_client_auth="0" \
   tls="on" \
   tls_skip_verify="on" \
   client_tls_cert="/home/ubuntu/mc/kafka_server.crt" \
   client_tls_key="/home/ubuntu/mc/kafka_server.key" \
   queue_dir="/tmp/minio/events" \
   queue_limit="10" \
   comment="initial"

Obtain ARN from minio startup or from ./mc admin info --json minio-kafka

SQS ARNs: arn:minio:sqs::TLS:kafka 
OR
./mc admin info --json minio-kafka

Create bucket in UI

kafka-demo-tls

Configure bucket notifications

./mc event add minio-kafka/kafka-demo arn:minio:sqs::TLS:kafka --event "put,delete,get,replica,ilm,scanner"

Output

Successfully added arn:minio:sqs::TLS:kafka

OR In UI image

Validate

./mc event ls minio-kafka/kafka-demo-tls arn:minio:sqs::TLS:kafka
./mc admin config get minio-kafka/ notify_kafka

On kafka box start consuming

/opt/kafka/bin/kafka-console-consumer.sh \
	--topic testssl-topic \
	--from-beginning \
	--bootstrap-server allanroger.ddns.net:19092 \
	--consumer.config kafka-client-ssl-test.properties 

Observe events in kafka consumer e.g.

{"EventName":"s3:ObjectRemoved:Delete","Key":"kafka-demo-tls/demo/pics/16760847493_0987a390bb_o.jpg","Records":[{"eventVersion":"2.0","eventSource":"minio:s3","awsRegion":"","eventTime":"2023-11-09T01:35:35.688Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"minioadmin"},"requestParameters":{"principalId":"minioadmin","region":"","sourceIPAddress":"127.0.0.1"},"responseElements":{"content-length":"2086","x-amz-id-2":"dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8","x-amz-request-id":"1795D08C5C847212","x-minio-deployment-id":"c0d5bf4f-8513-436d-a9ba-11cb11349220","x-minio-origin-endpoint":"https://10.212.97.192:9000"},"s3":{"s3SchemaVersion":"1.0","configurationId":"Config","bucket":{"name":"kafka-demo-tls","ownerIdentity":{"principalId":"minioadmin"},"arn":"arn:aws:s3:::kafka-demo-tls"},"object":{"key":"demo%2Fpics%2F16760847493_0987a390bb_o.jpg","sequencer":"1795D08C5DBD1BAB"}},"source":{"host":"127.0.0.1","port":"","userAgent":"MinIO (linux; amd64) minio-go/v7.0.64 MinIO Console/(dev)"}}]}
Clone this wiki locally