Skip to content

Commit 2b2ea2a

Browse files
authored
Merge pull request #313 from php-enqueue/kafka-do-not-subscribe-on-every-receive-call
[rdkafka] Don't do unnecessary subscribe\unsubscribe on every receive call
2 parents b0fcee8 + 9d416e3 commit 2b2ea2a

File tree

3 files changed

+49
-10
lines changed

3 files changed

+49
-10
lines changed

pkg/rdkafka/RdKafkaConsumer.php

+5-3
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ public function getQueue()
8282
*/
8383
public function receive($timeout = 0)
8484
{
85-
$this->consumer->subscribe([$this->topic->getTopicName()]);
85+
if (false == $this->subscribed) {
86+
$this->consumer->subscribe([$this->topic->getTopicName()]);
87+
88+
$this->subscribed = true;
89+
}
8690

8791
$message = null;
8892
if ($timeout > 0) {
@@ -95,8 +99,6 @@ public function receive($timeout = 0)
9599
}
96100
}
97101

98-
$this->consumer->unsubscribe();
99-
100102
return $message;
101103
}
102104

pkg/rdkafka/RdKafkaContext.php

+15-1
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,18 @@ class RdKafkaContext implements PsrContext
2929
*/
3030
private $producer;
3131

32+
/**
33+
* @var KafkaConsumer[]
34+
*/
35+
private $kafkaConsumers;
36+
3237
/**
3338
* @param array $config
3439
*/
3540
public function __construct(array $config)
3641
{
3742
$this->config = $config;
43+
$this->kafkaConsumers = [];
3844

3945
$this->setSerializer(new JsonSerializer());
4046
}
@@ -94,8 +100,10 @@ public function createConsumer(PsrDestination $destination)
94100
{
95101
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
96102

103+
$this->kafkaConsumers[] = $kafkaConsumer = new KafkaConsumer($this->getConf());
104+
97105
$consumer = new RdKafkaConsumer(
98-
new KafkaConsumer($this->getConf()),
106+
$kafkaConsumer,
99107
$this,
100108
$destination,
101109
$this->getSerializer()
@@ -113,6 +121,12 @@ public function createConsumer(PsrDestination $destination)
113121
*/
114122
public function close()
115123
{
124+
$kafkaConsumers = $this->kafkaConsumers;
125+
$this->kafkaConsumers = [];
126+
127+
foreach ($kafkaConsumers as $kafkaConsumer) {
128+
$kafkaConsumer->unsubscribe();
129+
}
116130
}
117131

118132
/**

pkg/rdkafka/Tests/RdKafkaConsumerTest.php

+29-6
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,34 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
5959
->with(1000)
6060
->willReturn($kafkaMessage)
6161
;
62+
63+
$consumer = new RdKafkaConsumer(
64+
$kafkaConsumer,
65+
$this->createContextMock(),
66+
$destination,
67+
$this->createSerializerMock()
68+
);
69+
70+
$this->assertNull($consumer->receive(1000));
71+
}
72+
73+
public function testShouldSubscribeOnFirstReceiveOnly()
74+
{
75+
$destination = new RdKafkaTopic('dest');
76+
77+
$kafkaMessage = new Message();
78+
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
79+
80+
$kafkaConsumer = $this->createKafkaConsumerMock();
6281
$kafkaConsumer
6382
->expects($this->once())
64-
->method('unsubscribe')
83+
->method('subscribe')
84+
->with(['dest'])
85+
;
86+
$kafkaConsumer
87+
->expects($this->any())
88+
->method('consume')
89+
->willReturn($kafkaMessage)
6590
;
6691

6792
$consumer = new RdKafkaConsumer(
@@ -71,7 +96,9 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
7196
$this->createSerializerMock()
7297
);
7398

74-
$this->assertNull($consumer->receive(1000));
99+
$consumer->receive(1000);
100+
$consumer->receive(1000);
101+
$consumer->receive(1000);
75102
}
76103

77104
public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
@@ -96,10 +123,6 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
96123
->with(1000)
97124
->willReturn($kafkaMessage)
98125
;
99-
$kafkaConsumer
100-
->expects($this->once())
101-
->method('unsubscribe')
102-
;
103126

104127
$serializer = $this->createSerializerMock();
105128
$serializer

0 commit comments

Comments
 (0)