High-level Kafka consumer for Ruby. Uses Zookeeper to manage load balancing, failover, and offset management.
A consumer group consists of multiple instances of the same consumer. Every instance registers itself in Zookeeper. Based on the number of instances that are registered, an instance will start consuming some or all of the partitions of the topics it wants to consume.
The distribution algorithm will make sure that every partition is only consumed by one consumer instance at a time. It uses Zookeeper watches to be notified of new consumer instances coming online or going offline, which will trigger a redistribition of all the partitions that are consumed.
Periodically, it will commit the last processed offset of every partition to Zookeeper. Whenever a new consumer starts, it will resume consuming every partition at the last committed offset. This implements an at least once guarantee, so it is possible that you end up consuming the same message more than once. It's your responsibility to deal with this if that is a problem for you, e.g. by using idempotent operations.
First, add kafka-consumer
to your Gemfile, and run bundle install
.
If your messages are snappy-compressed, add the snappy
gem as well.
zookeeper = "zk1:2181,zk2:2181,zk3:2181"
name = "access-log-processor"
topics = ["access_log"]
consumer = Kafka::Consumer.new(name, topics, zookeeper: zookeeper)
Signal.trap("INT") { consumer.interrupt }
consumer.each do |message|
# process message
end
- It will spawn a manager thread and two threads per partition. However, your code
doesn't have to be thread-safe because every message is yielded to
each
sequentially using a mutex. - On my Macbook Pro, I can consume around ~10,000 messages per second from a topic with 64 partitions on a production Kafka cluster.
- Fork it ( https://github.com/wvanbergen/kafka-consumer/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request