There are 4 key components in Kafka.
Producer: Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers.
Topics: A stream of messages belonging to a particular category is called a topic. Data is stored in topics.
Broker: Brokers are simple system responsible for maintaining the published data.
Consumer: Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers.
- Install Java
- Install Zookeeper
- Extract Kafka Binaries
- Configure & Start Kafka Server
nohup ~/kafka/bin/ ~/kafka/config/ > ~/kafka/kafka.log 2>&1 &
- Test Kafka with a message from Broker to Consumer
Start a new topic called TutorialTopic
Enter some strings as messsages.
~/kafka/bin/ --broker-list localhost:9092 --topic TutorialTopic
Or ask it to read from a file --broker-list localhost:9092 --topic TutorialTopic --new-producer < my_file.txt
Start Consumer in a new terminal to recieve messages
~/kafka/bin/ --zookeeper localhost:2181 --topic TutorialTopic --from-beginning
Messages should be received in this terminal
- Install KafkaT by Airbnb to manage Kafka
This is a Kafka manager created by Yahoo.
It is necessary to install sbt as a prerequisite, which includes Scala.
If there are issues with the
sbt clean dist
, look at reinstalling java (likely missing javac) by using the link below.
Extract the created zip file to ~/. To start the kafka-manager
cd kafka-manager-1.1*
go to extracted folderbin/kafka-manager
launch the server- Type
in browser
It is possible to have kafka monitor a single file for updates through filestream connector
This connector watch a directory for files and read the data as new files are written to the input directory.
pip install kafka-python
from kafka import KafkaConsumer
consumer = KafkaConsumer(topic, bootstrap_servers="192.168.0.x", auto_offset_reset='earliest', enable_auto_commit=False)
for msg in consumer:
log = msg.value
log = log.split('\n')
for n, i in enumerate(log):
print n, i
Other Messages in Consumer
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
Get list of topics
topic = "*"
consumer = KafkaConsumer(topic, bootstrap_servers="192.XXX.X.X", auto_offset_reset='earliest', enable_auto_commit=False)
print consumer.topics()
>>> set([u'z1', u'test_topic', u'Z1'])
Submit in command line. spark-streaming jar file will be downloaded to a local folder and will pull from it next time.
Remember to update kafka & spark version respectively for 2.10
& 1.6.1
spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.1
Python script
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
start = time.time()
# Spark Streaming
# ----------------------------------------
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
ssc = StreamingContext(sc, 1) # listen every 1 second
# ssc = spark-streaming context, zkQuorum = zookeeper, groupid = kafka topic group-id, topics = topicname:#partitions
kafkaStream = KafkaUtils.createStream(ssc=ssc, zkQuorum='192.168.0.x:2181', groupId='x1', topics={'x1':1})
parsed = x: x)