This application
- Takes a comma delimited topic of userid,colour
- Filters out bad data i.e. keep only colour of green,red and blue
- Gets the running count of the favourite colour overall and output this to a topic
This is developed using java 8 and kafka stream 2.7.0
Setup kafka cluster on your local machine. Refer
Kafka Cluster Setup
section for help. -
Create input and output compacted topics
bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-input bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
Start this application
Produce message with key as userid and value as colour from kafka console producer
bin/ --broker-list localhost:9092 --topic favourite-colour-input --property parse.key=true --property key.separator=, i.e. bin/ --broker-list localhost:9092 --topic favourite-colour-input --property parse.key=true --property key.separator=, >stephane,blue >john,green >stephane,red >alice,red
Consume messages from output topic
bin/ --bootstrap-server localhost:9092 \ --topic favourite-colour-output \ --from-beginning \ --formatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer red 5 red 5 green 3 red 4 blue 2 blue 3 blue 2
Download Kafka from
Extract it somewhere by executing tar command on Terminal
i.e. tar -xvf kafka_2.13-2.6.0.tgz
Go to that extracted Kafka folder
i.e. cd kafka_2.13-2.6.0/
Start zookeeper
bin/ config/
This will bring up zookeeper on default port 2181 configured in
file -
Start first broker/node
bin/ config/
This will start broker with below default broker id, log directory and port configured in
config/ log.dirs=/tmp/kafka-logs port=9092
Create a copy of
file for second broker/nodei.e. cp config/ config/
Change broker id, log directory and port in
config/ log.dirs=/tmp/kafka-logs-1 port=9093
Start second broker/node
bin/ config/
Create one more copy of
file for third broker/nodei.e. cp config/ config/
Change broker id, log directory and port in
config/ log.dirs=/tmp/kafka-logs-2 port=9094
Start third broker/node
bin/ config/
Check what brokers are up and running
bin/ localhost:2181 ls /brokers/ids
will give you below output
Connecting to localhost:2181 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [0, 1, 2]