diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index db21b9311..d5380b590 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -37,7 +37,7 @@ public function send(Destination $destination, Message $message): void InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class); - $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA; + $partition = $this->getPartition($destination, $message); $payload = $this->serializer->toString($message); $key = $message->getKey() ?: $destination->getKey() ?: null; @@ -53,17 +53,17 @@ public function send(Destination $destination, Message $message): void trigger_error( 'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '. 'Falling back to `produce` (without message headers) instead.', - E_USER_WARNING + \E_USER_WARNING ); } else { - $topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders()); + $topic->producev($partition, 0 /* must be 0 */ , $payload, $key, $message->getHeaders()); $this->producer->poll(0); return; } } - $topic->produce($partition, 0 /* must be 0 */, $payload, $key); + $topic->produce($partition, 0 /* must be 0 */ , $payload, $key); $this->producer->poll(0); } @@ -122,4 +122,21 @@ public function flush(int $timeout): void $this->producer->flush($timeout); } } + + /** + * @param RdKafkaTopic $destination + * @param RdKafkaMessage $message + */ + private function getPartition(Destination $destination, Message $message): int + { + if (null !== $message->getPartition()) { + return $message->getPartition(); + } + + if (null !== $destination->getPartition()) { + return $destination->getPartition(); + } + + return \RD_KAFKA_PARTITION_UA; + } } diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index 467f1a43e..01c1c1b4c 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -225,6 +225,104 @@ public function testShouldAllowSerializersToSerializeKeys() $producer->send(new RdKafkaTopic('theQueueName'), $message); } + public function testShouldGetPartitionFromMessage(): void + { + $partition = 1; + + $kafkaTopic = $this->createKafkaTopicMock(); + $kafkaTopic + ->expects($this->once()) + ->method('producev') + ->with( + $partition, + 0, + 'theSerializedMessage', + 'theSerializedKey' + ) + ; + + $kafkaProducer = $this->createKafkaProducerMock(); + $kafkaProducer + ->expects($this->once()) + ->method('newTopic') + ->willReturn($kafkaTopic) + ; + $kafkaProducer + ->expects($this->once()) + ->method('poll') + ->with(0) + ; + $messageHeaders = ['bar' => 'barVal']; + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders); + $message->setKey('key'); + $message->setPartition($partition); + + $serializer = $this->createSerializerMock(); + $serializer + ->expects($this->once()) + ->method('toString') + ->willReturnCallback(function () use ($message) { + $message->setKey('theSerializedKey'); + + return 'theSerializedMessage'; + }) + ; + + $destination = new RdKafkaTopic('theQueueName'); + + $producer = new RdKafkaProducer($kafkaProducer, $serializer); + $producer->send($destination, $message); + } + + public function testShouldGetPartitionFromDestination(): void + { + $partition = 2; + + $kafkaTopic = $this->createKafkaTopicMock(); + $kafkaTopic + ->expects($this->once()) + ->method('producev') + ->with( + $partition, + 0, + 'theSerializedMessage', + 'theSerializedKey' + ) + ; + + $kafkaProducer = $this->createKafkaProducerMock(); + $kafkaProducer + ->expects($this->once()) + ->method('newTopic') + ->willReturn($kafkaTopic) + ; + $kafkaProducer + ->expects($this->once()) + ->method('poll') + ->with(0) + ; + $messageHeaders = ['bar' => 'barVal']; + $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders); + $message->setKey('key'); + + $serializer = $this->createSerializerMock(); + $serializer + ->expects($this->once()) + ->method('toString') + ->willReturnCallback(function () use ($message) { + $message->setKey('theSerializedKey'); + + return 'theSerializedMessage'; + }) + ; + + $destination = new RdKafkaTopic('theQueueName'); + $destination->setPartition($partition); + + $producer = new RdKafkaProducer($kafkaProducer, $serializer); + $producer->send($destination, $message); + } + /** * @return \PHPUnit\Framework\MockObject\MockObject|ProducerTopic */