Here, we have provisioned two kafka nodes with two zookeeper nodes.
- Create 4 linux based VM/machines. (Ubuntu 17.10, CPU 4 Core, Memory 8GB, Disk 250GB)
Followed below steps to install Java on unbutu:
sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk
Set up the appropriate environment variables for Java.
sudo vi /etc/profile
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
export JRE_HOME=/usr/lib/jvm/jre
Installing kafka is very simple. One can check the latest release here.
- First, download the kafka tar on all machine
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/kafka_2.12-1.0.1.tgz
- Untar it
tar -xvzf kafka_2.12-1.0.1.tgz
- Explore the extracted folder. You will see
bin
andconfig
folders. These are important folders as you will be working with those in next steps.
After downloading Kafka, the next step is to change/update some configuration on Kafka.
- Edit
/kafka_2.12-1.0.1/config/server.properties
file. Provide IPs of zookeeper server (don't worry, we'll be configuring zookeeper in later steps), log output path and broker identifier (remember, it should be unique id for each kafka server).
zookeeper.connect=9.30.42.237:2181,9.30.118.104:2181
log.dirs=/tmp/kafka-logs-0
listeners=PLAINTEXT://:9092
broker.id=0
- Make sure that log output path already exist on nodes with right execution permissions.
mkdir /tmp/kafka-logs-0
drwxr-xr-x 2 root root 4096 Mar 19 17:17 kafka-logs-0
In this step, we will be configuring zookeeper nodes. Zookeeper is a high avalibilty coordination service that Kafka uses for coordination among brokers.
I followed the information provided in the apache zookeeper doc.
- Edit
/kafka_2.12-1.0.1/config/zookeeper.properties
file. Provide data directory path. Make suredataDir
file exists on the nodes.
dataDir=/tmp/zookeeper-0
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
initLimit=5
syncLimit=2
server.123=9.30.118.104:2888:3888
server.125=0.0.0.0:2888:3888
To let every zookeeper know about every other nodes in ensemble, we need to provide IDs, IPs and ports as follows.
server.123=9.30.118.104:2888:3888
server.125=0.0.0.0:2888:3888
123
and 125
are unique identifier provided in dataDir/myid
file on each nodes.
myid
file consists of a single line containing id of node.
2888
this is the port used by followers to connect to leader and 3888
is for leader election. (Make sure these ports are open on nodes)
- Run Kafka as demon process on Kafka nodes
nohup bin/kafka-server-start.sh config/server.properties &
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/zookeeper-shell.sh 9.30.42.237:2181 <<< "ls /brokers/ids"
Push a topic within Kafka cluster (containing two Kafka servers) with two partitions and replicate partition over two Kafka broker.
./bin/kafka-topics.sh --create --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --topic test-topic --partitions 2 --replication-factor 2
Created topic "test-topic".
Note that under the Kafka node's logs path, two partition has been created test-topic-0
& test-topic-1
on both nodes.
Push a topic within Kafka cluster (containing two Kafka servers) with two partitions and no replication.
./bin/kafka-topics.sh --create --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --topic non-repeat-topic --partitions 2 --replication-factor 1
Created topic "non-repeat-topic".
You will observe below.
On Kafka node #1:
On Kafka node #2:
Get the list of topics on Kafka cluster.
./bin/kafka-topics.sh --list --zookeeper 9.30.42.237:2181 9.30.118.10:2181
test-topic
non-repeat-topic
Describe test-topic
and non-repeat-topic
topics.
./bin/kafka-topics.sh --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --describe --topic test-topic
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
represents Kafka broker with id 0
acting as leader and handling all reads/writes for partition 0
. Replicas: 1,0
meaning partition 0
is replicated on broker id 0 and 1.
Delete topic test-topic
from Kafka cluster.
To enable deletion, we need to make sure to add delete.topic.enable=true
on all Kafka brokers server.properties
config file.
./bin/kafka-topics.sh --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --delete --topic test-topic
Run a producer process to publish data into test-topic
Kafka topic within the Kafka broker.
bin/kafka-console-producer.sh --broker-list 9.30.118.212:9092,9.30.214.93:9092 --topic test-topic
>
Edit /kafka_2.11-1.0.1/config/consumer.properties
with group id test-consumer-group-0
and test-consumer-group-1
by creating another consumer.properties1
config.
# consumer group id
group.id=test-consumer-group-1
Run a consumer process to process published data from a topic.
bin/kafka-console-consumer.sh --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --from-beginning --topic topic-new
First, run a producer process for pushing data to a topic-new
topic.
bin/kafka-console-producer.sh --broker-list 9.30.118.212:9092,9.30.214.93:9092 --topic topic-new
Run two consumer processes in a consumer group.
bin/kafka-console-consumer.sh --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --topic topic-new --consumer.config config/consumer.properties
Let's add messages in producer console as below:
This is how consumer processes would process messages through Kafka cluster.
Note:
Each consumer in a consumer group is guaranteed to read a particular message by only one consumer in the group. In other words, data pushed to a Kafka topic is only processed once in a consumer group. Or we can say, the processing of data is distributed among consumer processes in a consumer group.
First, run a producer process for pushing data to a cast-topic
topic.
bin/kafka-console-producer.sh --broker-list 9.30.118.212:9092,9.30.214.93:9092 --topic cast-topic
Run two consumer processes in two seperate consumer groups. We need two consumer-properties
config files as covered above.
bin/kafka-console-consumer.sh --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --topic cast-topic --consumer.config config/consumer.properties
bin/kafka-console-consumer.sh --zookeeper 9.30.42.237:2181 9.30.118.10:2181 --topic cast-topic --consumer.config config/consumer.properties1
Now, put [one, two, three, four, five, six]
messages in cast-topic
through producer process. Following is how consumer processes would receive messages.
Note:
Here, consumers from different consumer group receive all the messages on a Kafka topic. Meaning, if multiple consumer groups subscribe to a topic, then Kafka would broadcast messages to each of them.
- What happens if all the Kafka brokers died?
The consumer pulling data from topic gets Error while fetching metadata with correlation id 43 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
error.
Kafka is a publish-subscribe based messaging system that can be used to exchange data between processes, application and services. It has build-in partitioning, replication and fault-tolerance.
As we have already seen above, Kafka has capability to scale-processing (by distributing the data processing among consumer processes in a consumer group) and multi-subscriber (by broadcasting messages among consumer groups).