Net::Kafka - High-performant Perl client for Apache Kafka
use Net::Kafka::Producer;
use Net::Kafka::Consumer;
use AnyEvent;
# Produce 1 message into "my_topic"
my $condvar = AnyEvent->condvar;
my $producer = Net::Kafka::Producer->new(
'bootstrap.servers' => 'localhost:9092'
);
$producer->produce(
payload => "message",
topic => "my_topic"
)->then(sub {
my $delivery_report = shift;
$condvar->send;
print "Message successfully delivered with offset " . $delivery_report->{offset};
}, sub {
my $error = shift;
$condvar->send;
die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
});
$condvar->recv;
# Consume message from "my_topic"
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'localhost:9092',
'group.id' => 'my_consumer_group',
'enable.auto.commit' => 'true',
);
$consumer->subscribe( [ "my_topic" ] );
while (1) {
my $msg = $kafka->consumer_poll(1000);
if ($msg) {
if ( $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
}
else {
say $msg->payload;
}
}
}
This module provides Perl bindings to librdkafka C client library. It is heavily inspired by Kafka::Librd module originally developed by Pavel Shaydo.
Please refer to the following modules documentation in order to understand how to use it:
Net::Kafka::Producer
- asynchronous producer interfaceNet::Kafka::Consumer
- consumer interface that supports both Simple and Distributed modes
- GNU make
- librdkafka >= 1.0.0
First install librdkafka (https://github.com/edenhill/librdkafka#installation).
cpanm install Net::Kafka
Sources are available on Github: https://github.com/bookingcom/perl-Net-Kafka.
perl Makefile.pl
make
make test
make install
The Net::Kafka::Producer module provides interface to librdkafka's producer methods. It utilizes signal pipes,
AnyEvent watcher and AnyEvent::XSPromises to make its behaviour asynchronous. Taking that into consideration
you need to make sure to properly create condvar and send
/recv
in order to collect all outstanding promises.
It is highly suggested to familirize yourself with both AnyEvent and AnyEvent::XSPromises modules. See "SYNOPSIS" for example.
-
new()
my $producer = Net::Kafka::Producer->new( 'bootstrap.servers' => 'localhost:9092' );
Create an instance of Net::Kafka::Producer. Accept hash where keys are equal to property names of librdkafka (see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that only
error_cb
andstats_cb
callbacks are supported for Producer. Message delivery reports are served automatically throughPromise
basedproduce
method (see below). -
produce()
my $promise = $producer->produce( payload => "my_message", topic => "my_topic", key => "my_key", # optional timestamp => 1234567, # optional, if not specified current local timestamp will be used partition => 0 # optional, if not specified internal librdkafka partitioner will be used headers => $headers, # Optional, see Net::Kafka::Headers )->then(sub { my $delivery_report = shift; print "Message is sent with offset " . $delivery_report->{offset}; })->catch(sub { my $error = shift; print $error->{error} . "\n"; });
Sends a message to Kafka. Accepts hash with parameters.
Returns back an instance of
Promise
that will be resolved/rejected later. In case message is successfully send "resolve" callback will receive a delievry report in the form of the hash that containsoffset
,partition
andtimestamp
. If message delivery has failed "reject" callback will receive a hash that containserror
(a human readable error description) and (optionally)error_code
that is equal to librdkafka's error code. All error codes are mapped and exported byNet::Kafka
module as constants (e.g.Net::Kafka::RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
) for simplicity. -
partitions_for()
my $partitions = $producer->partitions_for("my_topic", $timeout_ms);
Returns an
ARRAYREF
that contains partition metadata information about the given topic (leader, replicas, ISR replicas); -
close()
$producer->close();
Explicitly closees
Net::Kafka::Producer
instance and underlying librdkafka handles.
The Net::Kafka::Consumer class provides interface to librdkafka's consumer functionality. It supports both "distributed" (subscription based) and "simple" (manual partition assignment) modes of work.
-
new()
my $consumer = Net::Kafka::Consumer->new( 'bootstrap.servers' => 'localhost:9092', 'group.id' => "my_consumer_group", 'enable.auto.commit' => "true", );
Create an instance of Net::Kafka::Consumer. Accept hash where keys are equal to property names of librdkafka (see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that not all callbacks are supported at the moment. Supported ones are:
error_cb
,rebalance_cb
,commit_cb
andstats_cb
. -
subscribe()
$consumer->subscribe([ 'my_topic' ]);
Subscribe to topic set using balanced consumer groups. The main entry-point for "distributed" consumer mode - partitions will be assigned automatically using Kafka's GroupApi semantics. Wildcard/regex topics are supported so matching topics will be added to the subscription list.
-
unsubscribe()
$consumer->unsubscribe();
Unsubscribe from the current subscription set.
-
assign()
# manually assign partitions 0 and 1 to be consumed my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $tp_list->add("my_topic", 1); $consumer->assign($tp_list);
Atomic assignment of partitions to consume. The main entry-point for "simple" consumer mode - partitions are assigned manually.
-
poll()
my $message = $consumer->poll($timeout_ms);
Poll the consumer for messages or events. Returns instance of
Net::Kafka::Message
. Will block for at mosttimeout_ms
milliseconds. An application should make sure to callpoll
at regular intervals. -
committed()
my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $consumer->committed($tp_list); my $offset = $tp_list->offset("my_topic_, 0);
Retrieve committed offsets for topics+partitions.
-
offsets_for_times()
my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $tp_list->set_offset("my_topic", 0, 958349923); # timestamp if passed through offset field $consumer->offsets_for_times($tp_list); my $offset = $tp_list->offset("my_topic");
Look up the offsets for the given partitions by timestamp.
-
pause()
my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $consumer->pause($tp_list); # pauses consumption of partition 0 of "my_topic"
Pause consumption for the provided list of partitions.
-
resume()
my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $consumer->resume($tp_list); # resumes consumption of partition 0 of "my_topic"
Resume consumption for the provided list of partitions.
-
subscription()
my $topics = $consumer->subscription();
Returns the current topic subscription
-
partitions_for()
my $partitions = $producer->partitions_for("my_topic");
Returns an
ARRAYREF
that contains partition metadata information about the given topic (leader, replicas, ISR replicas); -
commit()
$consumer->commit(); # commit current partition assignment (blocking call) $consumer->commit(1); # commit current partition assignment (non-blocking call) my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $tp_list->set_offset("my_topic", 0, 12345); $consumer->commit(0, $tp_list); # commit $tp_list assignment (blocking call);
Commit offsets on broker for the provided list of partitions. If no partitions provided current assignment is committed instead.
-
commit_message();
my $message = $consumer->poll(1000); $consumer->commit_message(0, $message); # commit message (blocking call); $consumer->commit_message(1, $message); # commit message (non-blocking call);
Commit message's offset on broker for the message's partition.
-
position()
my $position_list = Net::Kafka::TopicPartitionList->new(); $position_list->add("my_topic", 0); $consumer->position($position_list); my $position = $position_list->offset("my_topic", 0);
Retrieve current positions (offsets) for topics+partitions. The \p offset field of each requested partition will be set to the offset of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was no previous message.
Note: in this context the last consumed message is the offset consumed by the current librdkafka instance and, in case of rebalancing, not necessarily the last message fetched from the partition.
-
seek()
$consumer->seek("my_topic", 0, 12345); # seek partition 0 of "my_topic" to offset "12345" $consumer->seek("my_topic", 0, RD_KAFKA_OFFSET_BEGINNING); # seek to the beginning of "my_topic" partition 0 $consumer->seek("my_topic", 0, RD_KAFKA_OFFSET_END); # seek to the end of "my_topic" partition 0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
-
query_watermark_offsets()
my ($low, $high) = $consumer->query_watermark_offsets("my_topic", 0);
Queries Kafka Broker for lowest and highest watermark offsets in the given topic-partition.
-
close()
$consumer->close();
Close all consumer handles. Make sure to call it before destroying your application to make sure that all outstanding requests to be flushed.
This class maps to rd_kafka_message_t
structure from librdkafka and
represents message or event. Objects of this class have the following methods:
-
err()
return error code from the message
-
topic()
return topic name
-
partition()
return partition number
-
offset()
return offset. Note, that the value is truncated to 32 bit if your perl doesn't support 64 bit integers.
-
key()
return message key
-
payload()
return message payload
-
headers()
return a copy of message headers
-
detach_headers()
return message headers and removes them from the message
This class contains a list of Kafka headers (it allows duplicates). Objects of this class have the following methods:
-
new()
create a new instance
-
add(name, value)
append a new name/value pair to the header list
-
remove(name)
remove all headers with the given name, if any
-
get_last(name)
return the last value associated with a given name
-
to_hash()
return an hash-of-arrays containing all headers
This class provides static methods to convert error codes into names and descriptions.
-
rd_kafka_get_err_descs()
rd_kafka_get_err_descs()
returns a hash mapping error codes to description strings.
-
to_string()
to_string($code)
return the description string for this error code.
-
to_name()
to_name($code)
return the name of this error code.
Message offset is truncated to 32 bit if perl is compiled without support for 64 bit integers.
- https://github.com/edenhill/librdkafka
- https://github.com/trinitum/perl-Kafka-Librd
Copyright (C) 2016, 2017 Pavel Shaydo
Copyright (C) 2018, 2019 Booking.com
This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.