Skip to content

Commit 0ae852b

Browse files
authored
Merge pull request #508 from Flaconi/allow-subscribe-or-assign
Allow either subscribe or assign in RdKafkaConsumer
2 parents 5cca2d1 + 2bd9408 commit 0ae852b

File tree

2 files changed

+46
-12
lines changed

2 files changed

+46
-12
lines changed

pkg/rdkafka/RdKafkaConsumer.php

+10-8
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
5555
$this->topic = $topic;
5656
$this->subscribed = false;
5757
$this->commitAsync = false;
58-
$this->offset = null;
5958

6059
$this->setSerializer($serializer);
6160
}
@@ -98,13 +97,16 @@ public function getQueue()
9897
*/
9998
public function receive($timeout = 0)
10099
{
101-
if (false == $this->subscribed) {
102-
$this->consumer->assign([new TopicPartition(
103-
$this->getQueue()->getQueueName(),
104-
$this->getQueue()->getPartition(),
105-
$this->offset
106-
)]);
107-
100+
if (false === $this->subscribed) {
101+
if (null === $this->offset) {
102+
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
103+
} else {
104+
$this->consumer->assign([new TopicPartition(
105+
$this->getQueue()->getQueueName(),
106+
$this->getQueue()->getPartition(),
107+
$this->offset
108+
)]);
109+
}
108110
$this->subscribed = true;
109111
}
110112

pkg/rdkafka/Tests/RdKafkaConsumerTest.php

+36-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
5050
$kafkaConsumer = $this->createKafkaConsumerMock();
5151
$kafkaConsumer
5252
->expects($this->once())
53-
->method('assign')
53+
->method('subscribe')
5454
;
5555
$kafkaConsumer
5656
->expects($this->once())
@@ -79,7 +79,7 @@ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign()
7979
$kafkaConsumer = $this->createKafkaConsumerMock();
8080
$kafkaConsumer
8181
->expects($this->once())
82-
->method('assign')
82+
->method('subscribe')
8383
;
8484
$kafkaConsumer
8585
->expects($this->any())
@@ -106,6 +106,36 @@ public function testShouldSubscribeOnFirstReceiveOnly()
106106
$kafkaMessage = new Message();
107107
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
108108

109+
$kafkaConsumer = $this->createKafkaConsumerMock();
110+
$kafkaConsumer
111+
->expects($this->once())
112+
->method('subscribe')
113+
;
114+
$kafkaConsumer
115+
->expects($this->any())
116+
->method('consume')
117+
->willReturn($kafkaMessage)
118+
;
119+
120+
$consumer = new RdKafkaConsumer(
121+
$kafkaConsumer,
122+
$this->createContextMock(),
123+
$destination,
124+
$this->createSerializerMock()
125+
);
126+
127+
$consumer->receive(1000);
128+
$consumer->receive(1000);
129+
$consumer->receive(1000);
130+
}
131+
132+
public function testShouldAssignWhenOffsetIsSet()
133+
{
134+
$destination = new RdKafkaTopic('dest');
135+
136+
$kafkaMessage = new Message();
137+
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
138+
109139
$kafkaConsumer = $this->createKafkaConsumerMock();
110140
$kafkaConsumer
111141
->expects($this->once())
@@ -124,6 +154,8 @@ public function testShouldSubscribeOnFirstReceiveOnly()
124154
$this->createSerializerMock()
125155
);
126156

157+
$consumer->setOffset(123);
158+
127159
$consumer->receive(1000);
128160
$consumer->receive(1000);
129161
$consumer->receive(1000);
@@ -139,7 +171,7 @@ public function testThrowOnOffsetChangeAfterSubscribing()
139171
$kafkaConsumer = $this->createKafkaConsumerMock();
140172
$kafkaConsumer
141173
->expects($this->once())
142-
->method('assign')
174+
->method('subscribe')
143175
;
144176
$kafkaConsumer
145177
->expects($this->any())
@@ -174,7 +206,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
174206
$kafkaConsumer = $this->createKafkaConsumerMock();
175207
$kafkaConsumer
176208
->expects($this->once())
177-
->method('assign')
209+
->method('subscribe')
178210
;
179211
$kafkaConsumer
180212
->expects($this->once())

0 commit comments

Comments
 (0)