PHP-rdkafka is a thin librdkafka binding to providing a working PHP 5 / PHP 7 Kafka 0.8 client.
It supports the consumer, producer, and metadata APIs.
The API ressembles as much as possible to librdkafka's.
- Installation
- Examples
- Usage
- API
- RdKafka\Consumer
- RdKafka\Producer
- RdKafka
- RdKafka\Conf
- RdKafka\TopicConf
- RdKafka\Topic
- RdKafka\ConsumerTopic
- RdKafka\ProducerTopic
- RdKafka\Message
- RdKafka\Queue
- RdKafka\Exception
- RdKafka\Metadata
- RdKafka\Metadata\Topic
- RdKafka\Metadata\Broker
- RdKafka\Metadata\Partition
- RdKafka\Metadata\Collection
- Functions
- Constants
- Credits
- License
php-rdkafka is compatible with PHP 5 (master branch, PECL release); and has an experimental [PHP 7 branch](https://github.com/arnaud-lb/php-rdkafka/tree/php7)
For PHP version 7, installation from source should be preferred.
sudo pecl install channel://pecl.php.net/rdkafka-alpha
For PHP version 7, make sure to use the php7 branch.
phpize
./configure
make
sudo make install
# Add extension=rdkafka.so to your php.ini:
echo extension=rdkafka.so|sudo tee -a /path/to/php.ini
See examples
For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:
<?php
$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");
Next, we create a topic instance from the producer:
<?php
$topic = $rk->newTopic("test");
From there, we can produce as much messages as we want, using the produce method:
<?php
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
The second argument are message flags and should always be 0, currently.
The message payload can be anything.
For consuming, we first need to create a consumer, and to add brokers (Kafka servers) to it:
<?php
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");
Next, create a topic instance by calling the newTopic()
method, and start
consuming on partition 0:
<?php
$topic = $rk->newTopic("test");
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
Next, retrieve the consumed messages:
<?php
while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:
Creating the queue:
<?php
$queue = $rk->newQueue();
Adding topars to the queue:
<?php
$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
Next, retrieve the consumed messages from the queue:
<?php
while (true) {
// The only argument is the timeout.
$msg = $queue->consume(1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
librdkafka can store offsets in a local file, or on the broker. The default is local file, and as soon as you start using RD_KAFKA_OFFSET_STORED
as consuming offset, rdkafka starts to store the offset.
By default, the file is created in the current directory, with a named based on the topic and the partition. The directory can be changed by setting the offset.store.path
configuration property.
Other interesting properties are: offset.store.sync.interval.ms
, offset.store.method
, auto.commit.interval.ms
, auto.commit.enable
, offset.store.method
, group.id
.
<?php
$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);
$topicConf->set("offset.store.sync.interval.ms", 60e3);
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
$producer = new RdKafka\Producer(RdKafka\Conf $conf = null);
Creates a new Kafka producer and starts its operation.
$conf
is an optional RdKafka\Conf
instance that will
be used instead of the default configuration.
The $conf
object is copied, and changing $conf
after that as no effect
on the producer.
See RdKafka\Conf
for more information.
$topic = $producer->newTopic(string $topic, RdKafka\TopicConf $conf = null);
Creates a new RdKafka\ProducerTopic
instance for topic named $topic
.
$conf
is an optional configuration for the topic that will be used instead
of the default topic configuration.
The $conf
object is copied by this function, and changing $conf
after
that has no effect on the topic.
See RdKafka\TopicConf
for more information.
$qlen = $producer->outqLen();
Returns the current out queue length: messages waiting to be sent to, or acknowledged by, the broker.
$producer->poll(int $timeout_ms);
Polls the Producer handle for events.
$consumer = new RdKafka\Consumer(RdKafka\Conf $conf = null);
Creates a new Kafka consumer and starts its operation.
$conf
is an optional RdKafka\Conf
instance that will
be used instead of the default configuration.
The $conf
object is copied, and changing $conf
after that as no effect
on the producer.
See RdKafka\Conf
for more information.
$queue = $consumer->newQueue();
Returns a RdKafka\Queue instance.
$topic = $consumer->newTopic(string $topic, RdKafka\TopicConf $conf = null);
Creates a new RdKafka\ConsumerTopic
for topic named $topic
.
$conf
is an optional configuration for the topic that will be used instead
of the default topic configuration.
The $conf
object is copied by this function, and changing $conf
after
that has no effect on the topic.
See RdKafka\TopicConf
for more information.
RdKafka is the base class for RdKafka\Producer
and RdKafka\Consumer
.
$rk->addBrokers(string $brokerList);
Adds a one or more brokers to the instance's list of initial brokers. Additional brokers will be discovered automatically as soon as rdkafka connects to a broker by querying the broker metadata.
If a broker name resolves to multiple addresses (and possibly address families) all will be used for connection attempts in round-robin fashion.
$brokerList
is a ,-separated list of brokers in the format:
<host1>[:<port1>],<host2>[:<port2>]...
Example:
$rk->addBrokers("10.0.0.1:9092,10.0.0.2");
Returns the number of brokers successfully added.
NOTE: Brokers may also be defined with the metadata.broker.list
configuration property.
$rk->setLogLevel(int $level);
Specifies the maximum logging level produced by
internal kafka logging and debugging.
If the debug
configuration property is set the level is automatically
adjusted to LOG_DEBUG
.
Valid values for $level
are any of the syslog LOG_*
priorities:
https://php.net/manual/en/function.syslog.php
$metadata = $rk->metadata(bool $all_topics, RdKafka\Topic $only_topic = null, int $timeout_ms);
Request Metadata from broker.
- all_topics - if true: request info about all topics in cluster, if false: only request info about locally known topics.
- only_rkt - only request info about this topic
- timeout_ms - maximum response time before failing.
Returns a RdKafka\Metadata
$conf = new RdKafka\Conf();
Creates a new configuration. The list of available configuration properties is documented at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
$dump = $conf->dump();
Dump the configuration properties and values to an array.
$conf->set(string $name, string $value);
Sets a configuration property.
Throws a RdKafka\Exception
on error.
$conf = new RdKafka\TopicConf();
Creates a new topic configuration. See RdKafka\Conf
.
Set partitioner callback.
Allowed values are RD_KAFKA_MSG_PARTITIONER_RANDOM
,
RD_KAFKA_MSG_PARTITIONER_CONSISTENT
.
RdKafka\Topic is the base class for RdKafka\ConsumerTopic
and RdKafka\ProducerTopic
.
$name = $topic->getName();
Returns the topic name.
New ConsumerTopic instances can be created by calling
RdKafka\Consumer::newTopic()
.
$topic->consumeStart(int $partition, int $offset);
Start consuming messages for $partition
at offset $offset
which may
either be a proper offset (0..N) or one of the the special offsets:
RD_KAFKA_OFFSET_BEGINNING
, RD_KAFKA_OFFSET_END
,
RD_KAFKA_OFFSET_STORED
, rd_kafka_offset_tail(..)
.
rdkafka will attempt to keep queued.min.messages
(config property)
messages in the local queue by repeatedly fetching batches of messages
from the broker until the threshold is reached.
The application shall use the consume()
method
to consume messages from the local queue, each kafka message being
represented as a RdKafka\Message
object.
consumeStart()
must not be called multiple times for the same
topic and partition without stopping consumption first with
consumeStop()
.
Throws a RdKafka\Exception
on error.
$topic->consumeStop(int $partition);
Stop consuming messages for $partition
, purging all messages currently in the
local queue.
Throws a RdKafka\Exception
on error.
$topic->consumeQueueStart(int $partition, int $offset, RdKafka\Queue $queue);
Same as consumeStart()
but re-routes incoming messages to
the provided queue $queue
.
The application must use one of the RdKafka\Queue::consume*()
functions
to receive fetched messages.
consumeQueueStart()
must not be called multiple times for the
same topic and partition without stopping consumption first with
consumeStop()
.
consumeStart()
and consumeQueueStart()
must not be combined for the
same topic and partition.
Throws a RdKafka\Exception
on error.
$message = $topic->consume(int $partition, int $timeout_ms);
Consume a single message from $partition
.
$timeout_ms
is maximum amount of time to wait for a message to be received.
Consumer must have been previously started with consumeStart()
.
Returns NULL on timeout.
Throws a RdKafka\Exception
on error.
NOTE: The returned message's ..->err
must be checked for errors.
NOTE: ..->err
== RD_KAFKA_RESP_ERR__PARTITION_EOF
signals that the end
of the partition has been reached, which should typically not be
considered an error. The application should handle this case
(e.g., ignore).
See Topic::getName()
$topic->offsetStore($message->partition, $message->offset+1);
Store offset offset
for topic rkt
partition partition
. The
offset will be commited (written) to the offset store according to
auto.commit.interval.ms
.
NOTE: auto.commit.enable
must be set to "false"
when using this API.
Throws a RdKafka\Exception
on error.
New ProducerTopic instances can be created by calling
RdKafka\Producer::newTopic()
.
See Topic::getName()
$topic->produce(int $partition, int $msgflags, string $payload, string $key = null)
Produce and send a single message to broker.
produce()
is an asynch non-blocking API.
$partition
is the target partition, either:
RD_KAFKA_PARTITION_UA
(unassigned) for automatic partitioning using the topic's partitioner function, or- a fixed partition (0..N)
$msgflags
must be 0.
$payload
is the message payload.
$key
is an optional message key, if non-NULL it
will be passed to the topic partitioner as well as be sent with the
message to the broker and passed on to the consumer.
Throws a RdKafka\Exception
on error.
A Kafka message as returned by the consuming methods.
This object has two purposes:
- provide the application with a consumed message. (
->err
== 0) - report per-topic+partition consumer errors (
->err
!= 0)
The application must check err
to decide what action to take.
Non-zero for error signaling. Use errstr()
for a string representation.
Topic name
Partition
When err == 0: the message payload
When err == 0: Optional message key
When err == 0: Message offset
$errstr = $message->errstr();
When err != 0, returns the string representation of the error.
New Queue instances can be created by calling
RdKafka\Consumer::newQueue()
.
Message queues allows the application to re-route consumed messages
from multiple topic+partitions into one single queue point.
This queue point, containing messages from a number of topic+partitions,
may then be served by a single consume()
call,
rather than one per topic+partition combination.
See RdKafka\ConsumerTopic::consumeQueueStart()
, RdKafka\Queue::consume()
.
$message = $queue->consume(int $timeout_ms);
See RdKafka\ConsumerTopic::consume()
Exceptions thrown by php-rdkafka are of this type.
Metadata container.
See RdKafka::metadata()
.
$id = $metadata->getOrigBrokerId();
Returns the broker originating this metadata.
$name = $metadata->getOrigBrokerName();
Returns the name of originating broker.
$brokers = metadata->getBrokers();
printf("There are %d brokers", count($brokers));
foreach ($brokers as $broker) {
...
}
Returns a RdKafka\Metadata\Collection
of RdKafka\Metadata\Broker
.
$topics = $metadata->getTopics();
printf("There are %d topics", count($topics));
foreach ($topics as $topic) {
...
}
Returns a RdKafka\Metadata\Collection
of RdKafka\Metadata\Topic
.
Metadata: Broker information.
$id = $broker->getId();
Returns the broker id.
$host = $broker->getHost();
Returns the broker hostname.
$port = $broker->getPort();
Returns the broker port.
Metadata: Topic information.
$name = $topic->getTopic();
Returns the topic name.
$name = $topic->getErr();
Returns the topic error reported by broker.
$topics = $topic->getPartitions();
printf("There are %d partitions", count($partitions));
foreach ($partitions as $partition) {
...
}
Returns a RdKafka\Metadata\Collection
of RdKafka\Metadata\Partition
.
Metadata: Partition information.
$id = $partition->getId();
Returns the partition id.
$err = $partition->getErr();
Returns the partition error reported by broker.
$leader = $partition->getLeader();
Returns the leader broker id.
$replicas = $partitions->getReplicas();
printf("There are %d replicas", count($replicas));
foreach ($replicas as $replica) {
...
}
Returns a RdKafka\Metadata\Collection
of replica broker ids for this partition.
$replicas = $partitions->getIsrs();
printf("There are %d In-Sync-Replicas", count($replicas));
foreach ($replicas as $replica) {
...
}
Returns a RdKafka\Metadata\Collection
of In-Sync-Replica broker ids for this partition.
RdKafka\Metadata\Collection
implements Iterator
(can be used in foreach
), and Countable
(can be used in count()
).
Returns a human readable representation of a kafka error
Converts errno
to a rd_kafka_resp_err_t
error code
Returns errno
$offset = rd_kafka_offset_tail($cnt);
Returns a special offset to start consuming $cnt
messages from topic's current .._END
offset.
That is, if current end offset is 12345 and $cnt
is 200, it will start consuming from offset 12345-200 = 12145.
Start consuming from beginning of kafka partition queue: oldest msg
Start consuming from end of kafka partition queue: next msg
Start consuming from offset retrieved from offset store
Unassigned partition.
The unassigned partition is used by the producer API for messages that should be partitioned using the configured or default partitioner.
librdkafka version
Interpreted as hex MM.mm.rr.xx:
- MM = Major
- mm = minor
- rr = revision
- xx = currently unused
I.e.: 0x00080100 = 0.8.1
begin internal error codes
Received message is incorrect
Bad/unknown compression
Broker is going away
Generic failure
Broker transport error
Critical system resource failure
Failed to resolve broker
Produced message timed out
Reached the end of the topic+partition queue on the broker. Not really an error.
Permanent: Partition does not exist in cluster.
File or filesystem error
Permanent: Topic does not exist in cluster.
All broker connections are down.
Invalid argument, or invalid configuration
Operation timed out
Queue is full
ISR count < required.acks
end internal error codes
Random partitioner.
This is the default partitioner.
Returns a random partition between 0 and the number of partitions minus 1.
Consistent partitioner.
Uses consistent hashing to map identical keys onto identical partitions.
Returns a partition between 0 and number of partitions minus 1 based on the crc value of the key.
Documentation copied from librdkafka.h.
Authors: see contributors.
php-rdkafka is released under the MIT license.