From 257fdb5fbcb95d141ced3de2fc0d1dfd2a53fb3d Mon Sep 17 00:00:00 2001 From: Adrian Shum Date: Fri, 11 Jan 2019 23:16:24 +0800 Subject: [PATCH 1/4] SQS requeue: terminate visibility timeout instead of recreate message --- pkg/sqs/SqsConsumer.php | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index b6ded7e20..99929623a 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -133,14 +133,19 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); - $this->context->getSqsClient()->deleteMessage([ - '@region' => $this->queue->getRegion(), - 'QueueUrl' => $this->context->getQueueUrl($this->queue), - 'ReceiptHandle' => $message->getReceiptHandle(), - ]); - if ($requeue) { - $this->context->createProducer()->send($this->queue, $message); + $this->context->getAwsSqsClient()->changeMessageVisibility([ + '@region' => $this->queue->getRegion(), + 'QueueUrl' => $this->context->getQueueUrl($this->queue), + 'ReceiptHandle' => $message->getReceiptHandle(), + 'VisibilityTimeout' => 0, + ]); + } else { + $this->context->getSqsClient()->deleteMessage([ + '@region' => $this->queue->getRegion(), + 'QueueUrl' => $this->context->getQueueUrl($this->queue), + 'ReceiptHandle' => $message->getReceiptHandle(), + ]); } } From 69fba8c890355e8844a6deccda671bb04cfd6c50 Mon Sep 17 00:00:00 2001 From: Adrian Shum Date: Fri, 11 Jan 2019 23:20:07 +0800 Subject: [PATCH 2/4] update SqsClient --- pkg/sqs/SqsClient.php | 5 +++++ pkg/sqs/SqsConsumer.php | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/sqs/SqsClient.php b/pkg/sqs/SqsClient.php index 65cf2fb29..4fc87fabb 100644 --- a/pkg/sqs/SqsClient.php +++ b/pkg/sqs/SqsClient.php @@ -43,6 +43,11 @@ public function receiveMessage(array $args): Result return $this->callApi('receiveMessage', $args); } + public function changeMessageVisibility(array $args): Result + { + return $this->callApi('changeMessageVisibility', $args); + } + public function purgeQueue(array $args): Result { return $this->callApi('purgeQueue', $args); diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index 99929623a..3c8714e3d 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -134,7 +134,7 @@ public function reject(Message $message, bool $requeue = false): void InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); if ($requeue) { - $this->context->getAwsSqsClient()->changeMessageVisibility([ + $this->context->getSqsClient()->changeMessageVisibility([ '@region' => $this->queue->getRegion(), 'QueueUrl' => $this->context->getQueueUrl($this->queue), 'ReceiptHandle' => $message->getReceiptHandle(), From 6c1cf35b96b17cb6459cef655807c80ab73321e5 Mon Sep 17 00:00:00 2001 From: Adrian Shum Date: Mon, 14 Jan 2019 16:13:26 +0800 Subject: [PATCH 3/4] attempt fix test --- pkg/sqs/Tests/SqsConsumerTest.php | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/sqs/Tests/SqsConsumerTest.php b/pkg/sqs/Tests/SqsConsumerTest.php index 5d1f1cdeb..61e336771 100644 --- a/pkg/sqs/Tests/SqsConsumerTest.php +++ b/pkg/sqs/Tests/SqsConsumerTest.php @@ -203,19 +203,15 @@ public function testShouldRejectMessageAndRequeue() $client = $this->createSqsClientMock(); $client ->expects($this->once()) - ->method('deleteMessage') + ->method('changeMessageVisibility') ->with($this->identicalTo([ '@region' => null, 'QueueUrl' => 'theQueueUrl', 'ReceiptHandle' => 'theReceipt', + 'VisibilityTimeout' => 0, ])) ; - $message = new SqsMessage(); - $message->setReceiptHandle('theReceipt'); - - $destination = new SqsDestination('queue'); - $producer = $this->createProducerMock(); $producer ->expects($this->once()) @@ -235,11 +231,15 @@ public function testShouldRejectMessageAndRequeue() ->willReturn('theQueueUrl') ; $context - ->expects($this->once()) + ->expects($this->never()) ->method('createProducer') - ->willReturn($producer) ; + $message = new SqsMessage(); + $message->setReceiptHandle('theReceipt'); + + $destination = new SqsDestination('queue'); + $consumer = new SqsConsumer($context, $destination); $consumer->reject($message, true); } From 128d926a85bf4479debdebf754041727f18256ea Mon Sep 17 00:00:00 2001 From: Adrian Shum Date: Mon, 14 Jan 2019 16:26:12 +0800 Subject: [PATCH 4/4] attempt fix test --- pkg/sqs/Tests/SqsConsumerTest.php | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/pkg/sqs/Tests/SqsConsumerTest.php b/pkg/sqs/Tests/SqsConsumerTest.php index 61e336771..67e6f8fc4 100644 --- a/pkg/sqs/Tests/SqsConsumerTest.php +++ b/pkg/sqs/Tests/SqsConsumerTest.php @@ -205,20 +205,13 @@ public function testShouldRejectMessageAndRequeue() ->expects($this->once()) ->method('changeMessageVisibility') ->with($this->identicalTo([ - '@region' => null, + '@region' => 'theRegion', 'QueueUrl' => 'theQueueUrl', 'ReceiptHandle' => 'theReceipt', 'VisibilityTimeout' => 0, ])) ; - $producer = $this->createProducerMock(); - $producer - ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($destination), $this->identicalTo($message)) - ; - $context = $this->createContextMock(); $context ->expects($this->once()) @@ -239,6 +232,7 @@ public function testShouldRejectMessageAndRequeue() $message->setReceiptHandle('theReceipt'); $destination = new SqsDestination('queue'); + $destination->setRegion('theRegion'); $consumer = new SqsConsumer($context, $destination); $consumer->reject($message, true);