Write Kafka consumers in a model with retry
Add this line to your application's Gemfile:
gem 'simple_kafka_consumer'
And then execute:
$ bundle
Or install it yourself as:
$ gem install simple_kafka_consumer
You will want to write your own consumer class that inherits from SimpleKafkaConsumer::Consumer
. You will want to specify the group_name
and topic_name
. You'll also want to define the consume
method which is the handler for batch of messages received.
class MyConsumer < SimpleKafkaConsumer::Consumer
# the name used for coordinating multiple consumers
self.group_name = "my-group-name"
# the kafka topic we're reading from
self.topic_name = "my-topic-name"
# handle the messages
def consume(message)
puts message
end
end
You can have the consumer handle deserializing your data that is sent in the message. For example, if you used json as your message format:
class MyConsumer
def parse(message)
JSON.parse(message)
end
# the message you're consuming is now a parsed json object
def consume(json_object)
puts json_object['name']
end
end
To create a consumer instance, you'll need to provide an array of kafka servers and an array of zookeeper servers. You can optionally provide a logger as well.
# create a consumer
kafka_servers = ["localhost:9092"]
zookeeper_servers = ["localhost:2181"]
consumer = MyConsumer.new(kafka_servers, zookeeper_servers, logger: nil)
# run the consumer (loops and blocks)
consumer.run
This gem utilizes the poseidon_cluster
gem and consumers coordinate via zookeeper. Thus, you can run many consumers. The group_name
is what's used to determine which messages have already been processed.
- Fork it ( https://github.com/chingor13/simple_kafka_consumer/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request