Skip to content

Commit d1cc2d7

Browse files
authored
Merge pull request #1264 from qkdreyer/patch-2
Allow rdkafka falsy keys
2 parents 66837b8 + 664374b commit d1cc2d7

File tree

2 files changed

+62
-19
lines changed

2 files changed

+62
-19
lines changed

pkg/rdkafka/RdKafkaProducer.php

+2-19
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public function send(Destination $destination, Message $message): void
3737
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
3838
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
3939

40-
$partition = $this->getPartition($destination, $message);
40+
$partition = $message->getPartition() ?? $destination->getPartition() ?? RD_KAFKA_PARTITION_UA;
4141
$payload = $this->serializer->toString($message);
42-
$key = $message->getKey() ?: $destination->getKey() ?: null;
42+
$key = $message->getKey() ?? $destination->getKey() ?? null;
4343

4444
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
4545

@@ -122,21 +122,4 @@ public function flush(int $timeout): void
122122
$this->producer->flush($timeout);
123123
}
124124
}
125-
126-
/**
127-
* @param RdKafkaTopic $destination
128-
* @param RdKafkaMessage $message
129-
*/
130-
private function getPartition(Destination $destination, Message $message): int
131-
{
132-
if (null !== $message->getPartition()) {
133-
return $message->getPartition();
134-
}
135-
136-
if (null !== $destination->getPartition()) {
137-
return $destination->getPartition();
138-
}
139-
140-
return \RD_KAFKA_PARTITION_UA;
141-
}
142125
}

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+60
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,66 @@ public function testShouldGetPartitionFromDestination(): void
323323
$producer->send($destination, $message);
324324
}
325325

326+
public function testShouldAllowFalsyKeyFromMessage(): void
327+
{
328+
$key = 0;
329+
330+
$kafkaTopic = $this->createKafkaTopicMock();
331+
$kafkaTopic
332+
->expects($this->once())
333+
->method('producev')
334+
->with(
335+
RD_KAFKA_PARTITION_UA,
336+
0,
337+
'',
338+
$key
339+
)
340+
;
341+
342+
$kafkaProducer = $this->createKafkaProducerMock();
343+
$kafkaProducer
344+
->expects($this->once())
345+
->method('newTopic')
346+
->willReturn($kafkaTopic)
347+
;
348+
349+
$message = new RdKafkaMessage();
350+
$message->setKey($key);
351+
352+
$producer = new RdKafkaProducer($kafkaProducer, $this->createSerializerMock());
353+
$producer->send(new RdKafkaTopic(''), $message);
354+
}
355+
356+
public function testShouldAllowFalsyKeyFromDestination(): void
357+
{
358+
$key = 0;
359+
360+
$kafkaTopic = $this->createKafkaTopicMock();
361+
$kafkaTopic
362+
->expects($this->once())
363+
->method('producev')
364+
->with(
365+
RD_KAFKA_PARTITION_UA,
366+
0,
367+
'',
368+
$key
369+
)
370+
;
371+
372+
$kafkaProducer = $this->createKafkaProducerMock();
373+
$kafkaProducer
374+
->expects($this->once())
375+
->method('newTopic')
376+
->willReturn($kafkaTopic)
377+
;
378+
379+
$destination = new RdKafkaTopic('');
380+
$destination->setKey($key);
381+
382+
$producer = new RdKafkaProducer($kafkaProducer, $this->createSerializerMock());
383+
$producer->send($destination, new RdKafkaMessage());
384+
}
385+
326386
/**
327387
* @return \PHPUnit\Framework\MockObject\MockObject|ProducerTopic
328388
*/

0 commit comments

Comments
 (0)