From 879e6fb4605418f27e75216f59a845dbe082daa0 Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Wed, 25 Sep 2019 08:53:05 +0200 Subject: [PATCH 1/6] Use producev to support headers in kafka --- pkg/rdkafka/RdKafkaProducer.php | 2 +- pkg/rdkafka/composer.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 3f6d1923f..facdf8879 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -42,7 +42,7 @@ public function send(Destination $destination, Message $message): void $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); - $topic->produce($partition, 0 /* must be 0 */, $payload, $key); + $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); } /** diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index e0d55b396..d82de4909 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "ext-rdkafka": "^3.0.3", + "ext-rdkafka": "^3.1.0", "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": { From 65134d7af52f575403245dff5df7e9aecc898e95 Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Wed, 25 Sep 2019 09:13:26 +0200 Subject: [PATCH 2/6] Fix tests --- pkg/rdkafka/Tests/RdKafkaProducerTest.php | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index be5b94a6a..1f8e7db8b 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -45,18 +45,20 @@ public function testThrowIfMessageInvalid() public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube() { - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $messageHeaders = ['bar' => 'barVal']; + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders); $message->setKey('key'); $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ->with( RD_KAFKA_PARTITION_UA, 0, 'theSerializedMessage', - 'key' + 'key', + $messageHeaders ) ; @@ -87,7 +89,7 @@ public function testShouldPassNullAsTopicConfigIfNotSetOnTopic() $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ; $kafkaProducer = $this->createKafkaProducerMock(); @@ -123,7 +125,7 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic() $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ; $kafkaProducer = $this->createKafkaProducerMock(); @@ -165,13 +167,14 @@ public function testShouldAllowGetPreviouslySetSerializer() public function testShouldAllowSerializersToSerializeKeys() { - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $messageHeaders = ['bar' => 'barVal']; + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders); $message->setKey('key'); $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ->with( RD_KAFKA_PARTITION_UA, 0, From 733caddf549370b5aa46d2d76b4101344ac330cc Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Wed, 25 Sep 2019 11:15:29 +0200 Subject: [PATCH 3/6] Maintain backward compatibility --- pkg/rdkafka/RdKafkaContext.php | 11 +++++++++++ pkg/rdkafka/RdKafkaProducer.php | 19 ++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 22d4a8e47..950125296 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -138,6 +138,17 @@ public function purgeQueue(Queue $queue): void throw PurgeQueueNotSupportedException::providerDoestNotSupportIt(); } + public static function getLibrdKafkaVersion(): string + { + if (!defined('RD_KAFKA_VERSION')) { + throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed'); + } + $major = (RD_KAFKA_VERSION & 0xFF000000) >> 24; + $minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16; + $patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8; + return "$major.$minor.$patch"; + } + private function getProducer(): VendorProducer { if (null === $this->producer) { diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index facdf8879..a6135c50c 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -42,7 +42,24 @@ public function send(Destination $destination, Message $message): void $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); - $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + + // Note: Topic::producev method exists in phprdkafka > 3.1.0 + // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version + if (method_exists($topic, 'producev')) { + // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka >= 1.0.0 causing segfault + if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') + && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { + trigger_error( + 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', + E_USER_WARNING + ); + } else { + $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + return; + } + } + + $topic->produce($partition, 0 /* must be 0 */, $payload, $key); } /** From 748189d6fb9cf6cb9851737a018e197e7f93676c Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Wed, 25 Sep 2019 11:23:09 +0200 Subject: [PATCH 4/6] Update warning message --- pkg/rdkafka/RdKafkaProducer.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index a6135c50c..747c3ce02 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -50,7 +50,8 @@ public function send(Destination $destination, Message $message): void if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { trigger_error( - 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', + 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. ' . + 'Falling back to `produce` (without message headers) instead.', E_USER_WARNING ); } else { From aa646583628684327de0e661c86a3055a3a7224b Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Wed, 25 Sep 2019 11:34:08 +0200 Subject: [PATCH 5/6] =?UTF-8?q?CS=C2=A0fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rdkafka/RdKafkaContext.php | 1 + pkg/rdkafka/RdKafkaProducer.php | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 950125296..a82770ab8 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -146,6 +146,7 @@ public static function getLibrdKafkaVersion(): string $major = (RD_KAFKA_VERSION & 0xFF000000) >> 24; $minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16; $patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8; + return "$major.$minor.$patch"; } diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 747c3ce02..77ec9115b 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -50,12 +50,13 @@ public function send(Destination $destination, Message $message): void if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { trigger_error( - 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. ' . + 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '. 'Falling back to `produce` (without message headers) instead.', E_USER_WARNING ); } else { $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + return; } } From 2ae46550edc00558322cab2f68a774cc4cbb5917 Mon Sep 17 00:00:00 2001 From: TiMESPLiNTER Date: Wed, 25 Sep 2019 13:33:42 +0200 Subject: [PATCH 6/6] Revert version bump of rdkafka dep --- pkg/rdkafka/composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index d82de4909..e0d55b396 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "ext-rdkafka": "^3.1.0", + "ext-rdkafka": "^3.0.3", "queue-interop/queue-interop": "^0.7|^0.8" }, "require-dev": {