Skip to content

Commit ab572a8

Browse files
authored
Merge pull request #1278 from onatskyy/snsqs/fifo_parameters_in_message
[SNSQS] added possibility to send FIFO-related parameters using snsqs transport
2 parents cb6fa4f + 9d39fdc commit ab572a8

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed

pkg/snsqs/SnsQsMessage.php

+46
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ class SnsQsMessage implements Message
2222
*/
2323
private $messageAttributes;
2424

25+
/**
26+
* @var string|null
27+
*/
28+
private $messageGroupId;
29+
30+
/**
31+
* @var string|null
32+
*/
33+
private $messageDeduplicationId;
34+
2535
/**
2636
* See AWS documentation for message attribute structure.
2737
*
@@ -59,4 +69,40 @@ public function setMessageAttributes(?array $messageAttributes): void
5969
{
6070
$this->messageAttributes = $messageAttributes;
6171
}
72+
73+
/**
74+
* Only FIFO.
75+
*
76+
* The token used for deduplication of sent messages. If a message with a particular MessageDeduplicationId is sent successfully,
77+
* any messages sent with the same MessageDeduplicationId are accepted successfully but aren't delivered during the 5-minute
78+
* deduplication interval. For more information, see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html#FIFO-queues-exactly-once-processing.
79+
*/
80+
public function setMessageDeduplicationId(string $id = null): void
81+
{
82+
$this->messageDeduplicationId = $id;
83+
}
84+
85+
public function getMessageDeduplicationId(): ?string
86+
{
87+
return $this->messageDeduplicationId;
88+
}
89+
90+
/**
91+
* Only FIFO.
92+
*
93+
* The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group
94+
* are processed in a FIFO manner (however, messages in different message groups might be processed out of order).
95+
* To interleave multiple ordered streams within a single queue, use MessageGroupId values (for example, session data
96+
* for multiple users). In this scenario, multiple readers can process the queue, but the session data
97+
* of each user is processed in a FIFO fashion.
98+
*/
99+
public function setMessageGroupId(string $id = null): void
100+
{
101+
$this->messageGroupId = $id;
102+
}
103+
104+
public function getMessageGroupId(): ?string
105+
{
106+
return $this->messageGroupId;
107+
}
62108
}

pkg/snsqs/SnsQsProducer.php

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public function send(Destination $destination, Message $message): void
6161
$message->getHeaders()
6262
);
6363
$snsMessage->setMessageAttributes($message->getMessageAttributes());
64+
$snsMessage->setMessageGroupId($message->getMessageGroupId());
65+
$snsMessage->setMessageDeduplicationId($message->getMessageDeduplicationId());
6466

6567
$this->getSnsProducer()->send($destination, $snsMessage);
6668
} else {
@@ -70,6 +72,9 @@ public function send(Destination $destination, Message $message): void
7072
$message->getHeaders()
7173
);
7274

75+
$sqsMessage->setMessageGroupId($message->getMessageGroupId());
76+
$sqsMessage->setMessageDeduplicationId($message->getMessageDeduplicationId());
77+
7378
$this->getSqsProducer()->send($destination, $sqsMessage);
7479
}
7580
}

pkg/snsqs/Tests/SnsQsProducerTest.php

+55-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Enqueue\SnsQs\SnsQsQueue;
1111
use Enqueue\SnsQs\SnsQsTopic;
1212
use Enqueue\Sqs\SqsContext;
13+
use Enqueue\Sqs\SqsMessage;
1314
use Enqueue\Sqs\SqsProducer;
1415
use Enqueue\Test\ClassExtensionTrait;
1516
use Interop\Queue\Destination;
@@ -124,20 +125,71 @@ public function testShouldSendSnsTopicMessageWithAttributesToSnsProducer()
124125
$producer->send($destination, new SnsQsMessage('', [], [], ['foo' => 'bar']));
125126
}
126127

128+
public function testShouldSendToSnsTopicMessageWithGroupIdAndDeduplicationId()
129+
{
130+
$snsMock = $this->createSnsContextMock();
131+
$snsMock->method('createMessage')->willReturn(new SnsMessage());
132+
$destination = new SnsQsTopic('');
133+
134+
$snsProducerStub = $this->prophesize(SnsProducer::class);
135+
$snsProducerStub->send(
136+
$destination,
137+
Argument::that(function (SnsMessage $snsMessage) {
138+
return 'group-id' === $snsMessage->getMessageGroupId()
139+
&& 'deduplication-id' === $snsMessage->getMessageDeduplicationId();
140+
})
141+
)->shouldBeCalledOnce();
142+
143+
$snsMock->method('createProducer')->willReturn($snsProducerStub->reveal());
144+
145+
$snsMessage = new SnsQsMessage();
146+
$snsMessage->setMessageGroupId('group-id');
147+
$snsMessage->setMessageDeduplicationId('deduplication-id');
148+
149+
$producer = new SnsQsProducer($snsMock, $this->createSqsContextMock());
150+
$producer->send($destination, $snsMessage);
151+
}
152+
127153
public function testShouldSendSqsMessageToSqsProducer()
128154
{
129155
$sqsMock = $this->createSqsContextMock();
156+
$sqsMock->method('createMessage')->willReturn(new SqsMessage());
130157
$destination = new SnsQsQueue('');
131158

132-
$snsProducerStub = $this->prophesize(SqsProducer::class);
133-
$snsProducerStub->send($destination, Argument::any())->shouldBeCalledOnce();
159+
$sqsProducerStub = $this->prophesize(SqsProducer::class);
160+
$sqsProducerStub->send($destination, Argument::any())->shouldBeCalledOnce();
134161

135-
$sqsMock->method('createProducer')->willReturn($snsProducerStub->reveal());
162+
$sqsMock->method('createProducer')->willReturn($sqsProducerStub->reveal());
136163

137164
$producer = new SnsQsProducer($this->createSnsContextMock(), $sqsMock);
138165
$producer->send($destination, new SnsQsMessage());
139166
}
140167

168+
public function testShouldSendToSqsProducerMessageWithGroupIdAndDeduplicationId()
169+
{
170+
$sqsMock = $this->createSqsContextMock();
171+
$sqsMock->method('createMessage')->willReturn(new SqsMessage());
172+
$destination = new SnsQsQueue('');
173+
174+
$sqsProducerStub = $this->prophesize(SqsProducer::class);
175+
$sqsProducerStub->send(
176+
$destination,
177+
Argument::that(function (SqsMessage $sqsMessage) {
178+
return 'group-id' === $sqsMessage->getMessageGroupId()
179+
&& 'deduplication-id' === $sqsMessage->getMessageDeduplicationId();
180+
})
181+
)->shouldBeCalledOnce();
182+
183+
$sqsMock->method('createProducer')->willReturn($sqsProducerStub->reveal());
184+
185+
$sqsMessage = new SnsQsMessage();
186+
$sqsMessage->setMessageGroupId('group-id');
187+
$sqsMessage->setMessageDeduplicationId('deduplication-id');
188+
189+
$producer = new SnsQsProducer($this->createSnsContextMock(), $sqsMock);
190+
$producer->send($destination, $sqsMessage);
191+
}
192+
141193
/**
142194
* @return \PHPUnit\Framework\MockObject\MockObject|SnsContext
143195
*/

0 commit comments

Comments
 (0)