Skip to content

Commit bfa1217

Browse files
Merge pull request #1 from PicPay/master
Changed: Change partition recovery hierachy in RdKafkaProducer
2 parents 3880257 + 8474482 commit bfa1217

File tree

2 files changed

+119
-4
lines changed

2 files changed

+119
-4
lines changed

pkg/rdkafka/RdKafkaProducer.php

+21-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function send(Destination $destination, Message $message): void
3737
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
3838
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
3939

40-
$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
40+
$partition = $this->getPartition($destination, $message);
4141
$payload = $this->serializer->toString($message);
4242
$key = $message->getKey() ?: $destination->getKey() ?: null;
4343

@@ -53,17 +53,17 @@ public function send(Destination $destination, Message $message): void
5353
trigger_error(
5454
'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`. '.
5555
'Falling back to `produce` (without message headers) instead.',
56-
E_USER_WARNING
56+
\E_USER_WARNING
5757
);
5858
} else {
59-
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
59+
$topic->producev($partition, 0 /* must be 0 */ , $payload, $key, $message->getHeaders());
6060
$this->producer->poll(0);
6161

6262
return;
6363
}
6464
}
6565

66-
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
66+
$topic->produce($partition, 0 /* must be 0 */ , $payload, $key);
6767
$this->producer->poll(0);
6868
}
6969

@@ -122,4 +122,21 @@ public function flush(int $timeout): void
122122
$this->producer->flush($timeout);
123123
}
124124
}
125+
126+
/**
127+
* @param RdKafkaTopic $destination
128+
* @param RdKafkaMessage $message
129+
*/
130+
private function getPartition(Destination $destination, Message $message): int
131+
{
132+
if (null !== $message->getPartition()) {
133+
return $message->getPartition();
134+
}
135+
136+
if (null !== $destination->getPartition()) {
137+
return $destination->getPartition();
138+
}
139+
140+
return \RD_KAFKA_PARTITION_UA;
141+
}
125142
}

pkg/rdkafka/Tests/RdKafkaProducerTest.php

+98
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,104 @@ public function testShouldAllowSerializersToSerializeKeys()
225225
$producer->send(new RdKafkaTopic('theQueueName'), $message);
226226
}
227227

228+
public function testShouldGetPartitionFromMessage(): void
229+
{
230+
$partition = 1;
231+
232+
$kafkaTopic = $this->createKafkaTopicMock();
233+
$kafkaTopic
234+
->expects($this->once())
235+
->method('producev')
236+
->with(
237+
$partition,
238+
0,
239+
'theSerializedMessage',
240+
'theSerializedKey'
241+
)
242+
;
243+
244+
$kafkaProducer = $this->createKafkaProducerMock();
245+
$kafkaProducer
246+
->expects($this->once())
247+
->method('newTopic')
248+
->willReturn($kafkaTopic)
249+
;
250+
$kafkaProducer
251+
->expects($this->once())
252+
->method('poll')
253+
->with(0)
254+
;
255+
$messageHeaders = ['bar' => 'barVal'];
256+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
257+
$message->setKey('key');
258+
$message->setPartition($partition);
259+
260+
$serializer = $this->createSerializerMock();
261+
$serializer
262+
->expects($this->once())
263+
->method('toString')
264+
->willReturnCallback(function () use ($message) {
265+
$message->setKey('theSerializedKey');
266+
267+
return 'theSerializedMessage';
268+
})
269+
;
270+
271+
$destination = new RdKafkaTopic('theQueueName');
272+
273+
$producer = new RdKafkaProducer($kafkaProducer, $serializer);
274+
$producer->send($destination, $message);
275+
}
276+
277+
public function testShouldGetPartitionFromDestination(): void
278+
{
279+
$partition = 2;
280+
281+
$kafkaTopic = $this->createKafkaTopicMock();
282+
$kafkaTopic
283+
->expects($this->once())
284+
->method('producev')
285+
->with(
286+
$partition,
287+
0,
288+
'theSerializedMessage',
289+
'theSerializedKey'
290+
)
291+
;
292+
293+
$kafkaProducer = $this->createKafkaProducerMock();
294+
$kafkaProducer
295+
->expects($this->once())
296+
->method('newTopic')
297+
->willReturn($kafkaTopic)
298+
;
299+
$kafkaProducer
300+
->expects($this->once())
301+
->method('poll')
302+
->with(0)
303+
;
304+
$messageHeaders = ['bar' => 'barVal'];
305+
$message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], $messageHeaders);
306+
$message->setKey('key');
307+
308+
$serializer = $this->createSerializerMock();
309+
$serializer
310+
->expects($this->once())
311+
->method('toString')
312+
->willReturnCallback(function () use ($message) {
313+
$message->setKey('theSerializedKey');
314+
315+
return 'theSerializedMessage';
316+
})
317+
;
318+
319+
$destination = new RdKafkaTopic('theQueueName');
320+
$destination->setPartition($partition);
321+
322+
$producer = new RdKafkaProducer($kafkaProducer, $serializer);
323+
$producer->send($destination, $message);
324+
}
325+
228326
/**
229327
* @return \PHPUnit\Framework\MockObject\MockObject|ProducerTopic
230328
*/

0 commit comments

Comments
 (0)