From d99b2e10d3d5c3e0311a475ecae705e02e82acd4 Mon Sep 17 00:00:00 2001 From: Alexander Onatskiy Date: Fri, 11 Jun 2021 21:16:45 +0300 Subject: [PATCH] added possibility to send message attributes using snsqs transport --- pkg/snsqs/SnsQsMessage.php | 29 +++++++++++++++++++++++++-- pkg/snsqs/SnsQsProducer.php | 13 ++---------- pkg/snsqs/Tests/SnsQsProducerTest.php | 22 ++++++++++++++++++++ 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/pkg/snsqs/SnsQsMessage.php b/pkg/snsqs/SnsQsMessage.php index e34a103ff..63a5c1d72 100644 --- a/pkg/snsqs/SnsQsMessage.php +++ b/pkg/snsqs/SnsQsMessage.php @@ -17,12 +17,27 @@ class SnsQsMessage implements Message */ private $sqsMessage; - public function __construct(string $body = '', array $properties = [], array $headers = []) - { + /** + * @var array|null + */ + private $messageAttributes; + + /** + * See AWS documentation for message attribute structure. + * + * @see https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sns-2010-03-31.html#shape-messageattributevalue + */ + public function __construct( + string $body = '', + array $properties = [], + array $headers = [], + array $messageAttributes = null + ) { $this->body = $body; $this->properties = $properties; $this->headers = $headers; $this->redelivered = false; + $this->messageAttributes = $messageAttributes; } public function setSqsMessage(SqsMessage $message): void @@ -34,4 +49,14 @@ public function getSqsMessage(): SqsMessage { return $this->sqsMessage; } + + public function getMessageAttributes(): ?array + { + return $this->messageAttributes; + } + + public function setMessageAttributes(?array $messageAttributes): void + { + $this->messageAttributes = $messageAttributes; + } } diff --git a/pkg/snsqs/SnsQsProducer.php b/pkg/snsqs/SnsQsProducer.php index bdce7f895..99054286f 100644 --- a/pkg/snsqs/SnsQsProducer.php +++ b/pkg/snsqs/SnsQsProducer.php @@ -51,11 +51,7 @@ public function send(Destination $destination, Message $message): void InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class); if (false == $destination instanceof SnsQsTopic && false == $destination instanceof SnsQsQueue) { - throw new InvalidDestinationException(sprintf( - 'The destination must be an instance of [%s|%s] but got %s.', - SnsQsTopic::class, SnsQsQueue::class, - is_object($destination) ? get_class($destination) : gettype($destination) - )); + throw new InvalidDestinationException(sprintf('The destination must be an instance of [%s|%s] but got %s.', SnsQsTopic::class, SnsQsQueue::class, is_object($destination) ? get_class($destination) : gettype($destination))); } if ($destination instanceof SnsQsTopic) { @@ -64,6 +60,7 @@ public function send(Destination $destination, Message $message): void $message->getProperties(), $message->getHeaders() ); + $snsMessage->setMessageAttributes($message->getMessageAttributes()); $this->getSnsProducer()->send($destination, $snsMessage); } else { @@ -79,10 +76,6 @@ public function send(Destination $destination, Message $message): void /** * Delivery delay is supported by SQSProducer. - * - * @param int|null $deliveryDelay - * - * @return Producer */ public function setDeliveryDelay(int $deliveryDelay = null): Producer { @@ -93,8 +86,6 @@ public function setDeliveryDelay(int $deliveryDelay = null): Producer /** * Delivery delay is supported by SQSProducer. - * - * @return int|null */ public function getDeliveryDelay(): ?int { diff --git a/pkg/snsqs/Tests/SnsQsProducerTest.php b/pkg/snsqs/Tests/SnsQsProducerTest.php index d0925d15e..4444c888b 100644 --- a/pkg/snsqs/Tests/SnsQsProducerTest.php +++ b/pkg/snsqs/Tests/SnsQsProducerTest.php @@ -3,6 +3,7 @@ namespace Enqueue\SnsQs\Tests; use Enqueue\Sns\SnsContext; +use Enqueue\Sns\SnsMessage; use Enqueue\Sns\SnsProducer; use Enqueue\SnsQs\SnsQsMessage; use Enqueue\SnsQs\SnsQsProducer; @@ -91,6 +92,7 @@ public function testShouldGetDeliveryDelayFromSQSProducer() public function testShouldSendSnsTopicMessageToSnsProducer() { $snsMock = $this->createSnsContextMock(); + $snsMock->method('createMessage')->willReturn(new SnsMessage()); $destination = new SnsQsTopic(''); $snsProducerStub = $this->prophesize(SnsProducer::class); @@ -102,6 +104,26 @@ public function testShouldSendSnsTopicMessageToSnsProducer() $producer->send($destination, new SnsQsMessage()); } + public function testShouldSendSnsTopicMessageWithAttributesToSnsProducer() + { + $snsMock = $this->createSnsContextMock(); + $snsMock->method('createMessage')->willReturn(new SnsMessage()); + $destination = new SnsQsTopic(''); + + $snsProducerStub = $this->prophesize(SnsProducer::class); + $snsProducerStub->send( + $destination, + Argument::that(function (SnsMessage $snsMessage) { + return $snsMessage->getMessageAttributes() === ['foo' => 'bar']; + }) + )->shouldBeCalledOnce(); + + $snsMock->method('createProducer')->willReturn($snsProducerStub->reveal()); + + $producer = new SnsQsProducer($snsMock, $this->createSqsContextMock()); + $producer->send($destination, new SnsQsMessage('', [], [], ['foo' => 'bar'])); + } + public function testShouldSendSqsMessageToSqsProducer() { $sqsMock = $this->createSqsContextMock();