Skip to content

Commit 5d10587

Browse files
committed
add more logs
1 parent 0c25a37 commit 5d10587

File tree

3 files changed

+8
-2
lines changed

3 files changed

+8
-2
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "danikdantist/queue-wrapper",
3-
"version": "0.0.3",
3+
"version": "0.0.4",
44
"description": "My first Composer project",
55
"keywords": [
66
"queue",

src/Drivers/Kafka/Consumer.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ protected function init()
7575
$conf->set("enable.auto.commit", "false");
7676
$conf->set("enable.auto.offset.store", "false");
7777

78+
$conf->setErrorCb(function ($kafka, $err, $reason) {
79+
$this->logError(sprintf("%s (reason: %s)\n", rd_kafka_err2str($err), $reason));
80+
});
81+
7882
$topicConf = new \RdKafka\TopicConf();
7983

8084
$topicConf->set('auto.commit.enable', 'false');

src/Drivers/Kafka/Producer.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ protected function init()
5151
$conf->set('queued.max.messages.kbytes',100000000);
5252
$conf->set('socket.send.buffer.bytes',1000000);
5353
$conf->set('queue.buffering.max.messages',10000000);
54-
//$conf->set('queue.buffering.max.ms',1);
5554

55+
$conf->setErrorCb(function ($kafka, $err, $reason) {
56+
$this->logError(sprintf("%s (reason: %s)\n", rd_kafka_err2str($err), $reason));
57+
});
5658

5759
$rk = new \RdKafka\Producer($conf);
5860
$rk->setLogLevel(LOG_DEBUG);

0 commit comments

Comments
 (0)