Skip to content

Commit 781eb0d

Browse files
authored
Merge pull request #254 from flix-tech/feature-enable-key-serialization
[RdKafka] Enable serializers to serialize message keys
2 parents 53040ff + 58c7d4f commit 781eb0d

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

pkg/rdkafka/RdKafkaProducer.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public function send(PsrDestination $destination, PsrMessage $message)
4141
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
4242

4343
$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
44-
$key = $message->getKey() ?: $destination->getKey() ?: null;
4544
$payload = $this->serializer->toString($message);
45+
$key = $message->getKey() ?: $destination->getKey() ?: null;
4646

4747
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
4848
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+39
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,45 @@ public function testShouldAllowGetPreviouslySetSerializer()
9595
$this->assertSame($expectedSerializer, $producer->getSerializer());
9696
}
9797

98+
public function testShouldAllowSerializersToSerializeKeys()
99+
{
100+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
101+
$message->setKey('key');
102+
103+
$kafkaTopic = $this->createKafkaTopicMock();
104+
$kafkaTopic
105+
->expects($this->once())
106+
->method('produce')
107+
->with(
108+
RD_KAFKA_PARTITION_UA,
109+
0,
110+
'theSerializedMessage',
111+
'theSerializedKey'
112+
)
113+
;
114+
115+
$kafkaProducer = $this->createKafkaProducerMock();
116+
$kafkaProducer
117+
->expects($this->once())
118+
->method('newTopic')
119+
->willReturn($kafkaTopic)
120+
;
121+
122+
$serializer = $this->createSerializerMock();
123+
$serializer
124+
->expects($this->once())
125+
->method('toString')
126+
->willReturnCallback(function () use ($message) {
127+
$message->setKey('theSerializedKey');
128+
129+
return 'theSerializedMessage';
130+
})
131+
;
132+
133+
$producer = new RdKafkaProducer($kafkaProducer, $serializer);
134+
$producer->send(new RdKafkaTopic('theQueueName'), $message);
135+
}
136+
98137
/**
99138
* @return \PHPUnit_Framework_MockObject_MockObject|ProducerTopic
100139
*/

0 commit comments

Comments
 (0)