diff --git a/pkg/sqs/SqsClient.php b/pkg/sqs/SqsClient.php index 65cf2fb29..4fc87fabb 100644 --- a/pkg/sqs/SqsClient.php +++ b/pkg/sqs/SqsClient.php @@ -43,6 +43,11 @@ public function receiveMessage(array $args): Result return $this->callApi('receiveMessage', $args); } + public function changeMessageVisibility(array $args): Result + { + return $this->callApi('changeMessageVisibility', $args); + } + public function purgeQueue(array $args): Result { return $this->callApi('purgeQueue', $args); diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index b6ded7e20..3c8714e3d 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -133,14 +133,19 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); - $this->context->getSqsClient()->deleteMessage([ - '@region' => $this->queue->getRegion(), - 'QueueUrl' => $this->context->getQueueUrl($this->queue), - 'ReceiptHandle' => $message->getReceiptHandle(), - ]); - if ($requeue) { - $this->context->createProducer()->send($this->queue, $message); + $this->context->getSqsClient()->changeMessageVisibility([ + '@region' => $this->queue->getRegion(), + 'QueueUrl' => $this->context->getQueueUrl($this->queue), + 'ReceiptHandle' => $message->getReceiptHandle(), + 'VisibilityTimeout' => 0, + ]); + } else { + $this->context->getSqsClient()->deleteMessage([ + '@region' => $this->queue->getRegion(), + 'QueueUrl' => $this->context->getQueueUrl($this->queue), + 'ReceiptHandle' => $message->getReceiptHandle(), + ]); } } diff --git a/pkg/sqs/Tests/SqsConsumerTest.php b/pkg/sqs/Tests/SqsConsumerTest.php index 5d1f1cdeb..67e6f8fc4 100644 --- a/pkg/sqs/Tests/SqsConsumerTest.php +++ b/pkg/sqs/Tests/SqsConsumerTest.php @@ -203,26 +203,15 @@ public function testShouldRejectMessageAndRequeue() $client = $this->createSqsClientMock(); $client ->expects($this->once()) - ->method('deleteMessage') + ->method('changeMessageVisibility') ->with($this->identicalTo([ - '@region' => null, + '@region' => 'theRegion', 'QueueUrl' => 'theQueueUrl', 'ReceiptHandle' => 'theReceipt', + 'VisibilityTimeout' => 0, ])) ; - $message = new SqsMessage(); - $message->setReceiptHandle('theReceipt'); - - $destination = new SqsDestination('queue'); - - $producer = $this->createProducerMock(); - $producer - ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($destination), $this->identicalTo($message)) - ; - $context = $this->createContextMock(); $context ->expects($this->once()) @@ -235,11 +224,16 @@ public function testShouldRejectMessageAndRequeue() ->willReturn('theQueueUrl') ; $context - ->expects($this->once()) + ->expects($this->never()) ->method('createProducer') - ->willReturn($producer) ; + $message = new SqsMessage(); + $message->setReceiptHandle('theReceipt'); + + $destination = new SqsDestination('queue'); + $destination->setRegion('theRegion'); + $consumer = new SqsConsumer($context, $destination); $consumer->reject($message, true); }