diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 491da858c..e99e0cf3b 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -82,7 +82,11 @@ public function getQueue() */ public function receive($timeout = 0) { - $this->consumer->subscribe([$this->topic->getTopicName()]); + if (false == $this->subscribed) { + $this->consumer->subscribe([$this->topic->getTopicName()]); + + $this->subscribed = true; + } $message = null; if ($timeout > 0) { @@ -95,8 +99,6 @@ public function receive($timeout = 0) } } - $this->consumer->unsubscribe(); - return $message; } diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index e158423b9..0a67e5935 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -29,12 +29,18 @@ class RdKafkaContext implements PsrContext */ private $producer; + /** + * @var KafkaConsumer[] + */ + private $kafkaConsumers; + /** * @param array $config */ public function __construct(array $config) { $this->config = $config; + $this->kafkaConsumers = []; $this->setSerializer(new JsonSerializer()); } @@ -94,8 +100,10 @@ public function createConsumer(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); + $this->kafkaConsumers[] = $kafkaConsumer = new KafkaConsumer($this->getConf()); + $consumer = new RdKafkaConsumer( - new KafkaConsumer($this->getConf()), + $kafkaConsumer, $this, $destination, $this->getSerializer() @@ -113,6 +121,12 @@ public function createConsumer(PsrDestination $destination) */ public function close() { + $kafkaConsumers = $this->kafkaConsumers; + $this->kafkaConsumers = []; + + foreach ($kafkaConsumers as $kafkaConsumer) { + $kafkaConsumer->unsubscribe(); + } } /** diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 1fc654777..5acd7f4dc 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -59,9 +59,34 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() ->with(1000) ->willReturn($kafkaMessage) ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); + + $this->assertNull($consumer->receive(1000)); + } + + public function testShouldSubscribeOnFirstReceiveOnly() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('unsubscribe') + ->method('subscribe') + ->with(['dest']) + ; + $kafkaConsumer + ->expects($this->any()) + ->method('consume') + ->willReturn($kafkaMessage) ; $consumer = new RdKafkaConsumer( @@ -71,7 +96,9 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() $this->createSerializerMock() ); - $this->assertNull($consumer->receive(1000)); + $consumer->receive(1000); + $consumer->receive(1000); + $consumer->receive(1000); } public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() @@ -96,10 +123,6 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() ->with(1000) ->willReturn($kafkaMessage) ; - $kafkaConsumer - ->expects($this->once()) - ->method('unsubscribe') - ; $serializer = $this->createSerializerMock(); $serializer