diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 22d4a8e47..a82770ab8 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -138,6 +138,18 @@ public function purgeQueue(Queue $queue): void throw PurgeQueueNotSupportedException::providerDoestNotSupportIt(); } + public static function getLibrdKafkaVersion(): string + { + if (!defined('RD_KAFKA_VERSION')) { + throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed'); + } + $major = (RD_KAFKA_VERSION & 0xFF000000) >> 24; + $minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16; + $patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8; + + return "$major.$minor.$patch"; + } + private function getProducer(): VendorProducer { if (null === $this->producer) { diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 3f6d1923f..77ec9115b 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -42,6 +42,25 @@ public function send(Destination $destination, Message $message): void $key = $message->getKey() ?: $destination->getKey() ?: null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); + + // Note: Topic::producev method exists in phprdkafka > 3.1.0 + // Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version + if (method_exists($topic, 'producev')) { + // Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka >= 1.0.0 causing segfault + if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=') + && version_compare(phpversion('rdkafka'), '3.1.0', '<=')) { + trigger_error( + 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '. + 'Falling back to `produce` (without message headers) instead.', + E_USER_WARNING + ); + } else { + $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + + return; + } + } + $topic->produce($partition, 0 /* must be 0 */, $payload, $key); } diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index be5b94a6a..1f8e7db8b 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -45,18 +45,20 @@ public function testThrowIfMessageInvalid() public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube() { - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $messageHeaders = ['bar' => 'barVal']; + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders); $message->setKey('key'); $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ->with( RD_KAFKA_PARTITION_UA, 0, 'theSerializedMessage', - 'key' + 'key', + $messageHeaders ) ; @@ -87,7 +89,7 @@ public function testShouldPassNullAsTopicConfigIfNotSetOnTopic() $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ; $kafkaProducer = $this->createKafkaProducerMock(); @@ -123,7 +125,7 @@ public function testShouldPassCustomConfAsTopicConfigIfSetOnTopic() $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ; $kafkaProducer = $this->createKafkaProducerMock(); @@ -165,13 +167,14 @@ public function testShouldAllowGetPreviouslySetSerializer() public function testShouldAllowSerializersToSerializeKeys() { - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $messageHeaders = ['bar' => 'barVal']; + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders); $message->setKey('key'); $kafkaTopic = $this->createKafkaTopicMock(); $kafkaTopic ->expects($this->once()) - ->method('produce') + ->method('producev') ->with( RD_KAFKA_PARTITION_UA, 0,