From 1b3b91a660286ef0bd716a1202ca9a3f45687b3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 31 May 2019 19:20:34 +0200 Subject: [PATCH 1/8] Allow sending & reading headers via Kafka Message object. Note that `Topic::producev` & `KafkaMessage::headers` exist since phprdkafka >= 3.1.0. --- pkg/rdkafka/RdKafkaConsumer.php | 10 +++++++++- pkg/rdkafka/RdKafkaProducer.php | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index d7016a946..7c59dfd75 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -81,6 +81,7 @@ public function getQueue(): Queue } /** + * @param int $timeout * @return RdKafkaMessage */ public function receive(int $timeout = 0): ?Message @@ -140,7 +141,8 @@ public function acknowledge(Message $message): void } /** - * @param RdKafkaMessage $message + * @param Message $message + * @param bool $requeue */ public function reject(Message $message, bool $requeue = false): void { @@ -169,6 +171,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); + // Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's. + // Note: Requires phprdkafka >= 3.1.0 + if (isset($kafkaMessage->headers)) { + $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers)); + } + return $message; default: throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err); diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 3f6d1923f..040e1d878 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -42,7 +42,12 @@ public function send(Destination $destination, Message $message): void $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); - $topic->produce($partition, 0 /* must be 0 */, $payload, $key); + // Note: Topic::producev method exists in phprdkafka >= 3.1.0 + if (method_exists($topic, 'producev')) { + $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + } else { + $topic->produce($partition, 0 /* must be 0 */, $payload, $key); + } } /** From 3e930a1e49bc784c5f0a3097b35a8bb9058ead94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 31 May 2019 19:27:50 +0200 Subject: [PATCH 2/8] Allow sending & reading headers via Kafka Message object. Note that `Topic::producev` & `KafkaMessage::headers` exist since phprdkafka >= 3.1.0. --- pkg/rdkafka/RdKafkaProducer.php | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 040e1d878..cf0e1d158 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -43,6 +43,7 @@ public function send(Destination $destination, Message $message): void $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); // Note: Topic::producev method exists in phprdkafka >= 3.1.0 + // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); } else { From 735e67f6bb5dd9882a5423668fbe0633046d0c3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 31 May 2019 22:31:18 +0200 Subject: [PATCH 3/8] Allow sending & reading headers via Kafka Message object. Note that `Topic::producev` & `KafkaMessage::headers` exist since phprdkafka >= 3.1.0. --- pkg/rdkafka/RdKafkaProducer.php | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index cf0e1d158..18950ac5e 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -42,9 +42,15 @@ public function send(Destination $destination, Message $message): void $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); - // Note: Topic::producev method exists in phprdkafka >= 3.1.0 + // Note: Topic::producev method exists in phprdkafka > 3.1.0 // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { + + // Phprdkafka < 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault + if (version_compare($this->getLibrdKafkaVersion(), '1.0.0', '>=') + && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { + trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING); + } $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); } else { $topic->produce($partition, 0 /* must be 0 */, $payload, $key); @@ -98,4 +104,17 @@ public function getTimeToLive(): ?int { return null; } + + private function getLibrdKafkaVersion(): string + { + if (! defined('RD_KAFKA_VERSION')) { + throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed'); + } + + $major = (RD_KAFKA_VERSION & 0xFF000000) >> 24; + $minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16; + $patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8; + + return "$major.$minor.$patch"; + } } From c837eac55f7b0d6c8f5b3e1ce7d020e61ec3abe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 31 May 2019 22:33:56 +0200 Subject: [PATCH 4/8] Fix Code style --- pkg/rdkafka/RdKafkaConsumer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 7c59dfd75..ca8da7901 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -142,7 +142,7 @@ public function acknowledge(Message $message): void /** * @param Message $message - * @param bool $requeue + * @param bool $requeue */ public function reject(Message $message, bool $requeue = false): void { From 4b99deecc3f9788715501583b4aa668bacb3ef3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 31 May 2019 22:38:46 +0200 Subject: [PATCH 5/8] Fix comment --- pkg/rdkafka/RdKafkaProducer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 18950ac5e..62999f453 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -46,7 +46,7 @@ public function send(Destination $destination, Message $message): void // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { - // Phprdkafka < 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault + // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault if (version_compare($this->getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING); From 15634f043eaa71bb1cbbb3c3d6155cb600490ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Sat, 1 Jun 2019 00:02:26 +0200 Subject: [PATCH 6/8] Fix code style --- pkg/rdkafka/RdKafkaConsumer.php | 1 + pkg/rdkafka/RdKafkaProducer.php | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index ca8da7901..8f617f8b1 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -82,6 +82,7 @@ public function getQueue(): Queue /** * @param int $timeout + * * @return RdKafkaMessage */ public function receive(int $timeout = 0): ?Message diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 62999f453..f85732885 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -45,7 +45,6 @@ public function send(Destination $destination, Message $message): void // Note: Topic::producev method exists in phprdkafka > 3.1.0 // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { - // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault if (version_compare($this->getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { @@ -107,7 +106,7 @@ public function getTimeToLive(): ?int private function getLibrdKafkaVersion(): string { - if (! defined('RD_KAFKA_VERSION')) { + if (!defined('RD_KAFKA_VERSION')) { throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed'); } From fd45d2e6db04537082e1931579c8b0c6484857ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Sat, 1 Jun 2019 12:09:29 +0200 Subject: [PATCH 7/8] Move getLibrdkafka method to Context & make it static --- pkg/rdkafka/RdKafkaContext.php | 13 +++++++++++++ pkg/rdkafka/RdKafkaProducer.php | 15 +-------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 22d4a8e47..7fea8f5e0 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -56,6 +56,19 @@ public function __construct(array $config) $this->setSerializer(new JsonSerializer()); } + public static function getLibrdKafkaVersion(): string + { + if (!defined('RD_KAFKA_VERSION')) { + throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed'); + } + + $major = (RD_KAFKA_VERSION & 0xFF000000) >> 24; + $minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16; + $patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8; + + return "$major.$minor.$patch"; + } + /** * @return RdKafkaMessage */ diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index f85732885..5bb783064 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -46,7 +46,7 @@ public function send(Destination $destination, Message $message): void // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version if (method_exists($topic, 'producev')) { // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault - if (version_compare($this->getLibrdKafkaVersion(), '1.0.0', '>=') + if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING); } @@ -103,17 +103,4 @@ public function getTimeToLive(): ?int { return null; } - - private function getLibrdKafkaVersion(): string - { - if (!defined('RD_KAFKA_VERSION')) { - throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed'); - } - - $major = (RD_KAFKA_VERSION & 0xFF000000) >> 24; - $minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16; - $patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8; - - return "$major.$minor.$patch"; - } } From c809e8020bddb884db1e9780ef5baef61de068aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Sat, 1 Jun 2019 12:10:07 +0200 Subject: [PATCH 8/8] Revert phpdoc change --- pkg/rdkafka/RdKafkaConsumer.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 8f617f8b1..7fe65d5c6 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -142,8 +142,8 @@ public function acknowledge(Message $message): void } /** - * @param Message $message - * @param bool $requeue + * @param RdKafkaMessage $message + * @param bool $requeue */ public function reject(Message $message, bool $requeue = false): void {