Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ContentBasedDeduplication option when sending SQS messages (#938) #987

Merged
merged 1 commit into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ Such attributes are available as `MessageHeaders` in received messages.
|All
|Set the message system attribute names that will be retrieved with messages on receive operations.
Such attributes are available as `MessageHeaders` in received messages.

|`contentBasedDeduplication`
|ContentBasedDeduplication
|ContentBasedDeduplication
#AUTO
|Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to.
With `ContentBasedDeduplication#AUTO`, the queue attribute value will be resolved automatically.
|===

==== Sending Messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
/**
* Base class for {@link MessagingOperations}
* @param <S> the source message type for conversion
*
* @author Tomaz Fernandes
* @since 3.0
*/
public abstract class AbstractMessagingTemplate<S> implements MessagingOperations, AsyncMessagingOperations {

Expand Down Expand Up @@ -275,35 +278,44 @@ public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointN
@Override
public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointName, Message<T> message) {
String endpointToUse = getEndpointName(endpointName);
Message<T> messageToUse = preProcessMessageForSend(endpointToUse, message);
logger.trace("Sending message {} to endpoint {}", MessageHeaderUtils.getId(message), endpointName);
return doSendAsync(endpointToUse, convertMessageToSend(messageToUse), messageToUse)
.exceptionallyCompose(
t -> CompletableFuture
.failedFuture(new MessagingOperationFailedException(
return preProcessMessageForSendAsync(endpointToUse, message).thenCompose(
messageToUse -> doSendAsync(endpointToUse, convertMessageToSend(messageToUse), messageToUse)
.exceptionallyCompose(
t -> CompletableFuture.failedFuture(new MessagingOperationFailedException(
"Message send operation failed for message %s to endpoint %s"
.formatted(MessageHeaderUtils.getId(message), endpointToUse),
endpointToUse, message, t)))
.whenComplete((v, t) -> logSendMessageResult(endpointToUse, message, t));
.whenComplete((v, t) -> logSendMessageResult(endpointToUse, message, t)));
}

protected abstract <T> Message<T> preProcessMessageForSend(String endpointToUse, Message<T> message);

protected <T> CompletableFuture<Message<T>> preProcessMessageForSendAsync(String endpointToUse,
Message<T> message) {
return CompletableFuture.completedFuture(preProcessMessageForSend(endpointToUse, message));
}

@Override
public <T> CompletableFuture<SendResult.Batch<T>> sendManyAsync(@Nullable String endpointName,
Collection<Message<T>> messages) {
logger.trace("Sending messages {} to endpoint {}", MessageHeaderUtils.getId(messages), endpointName);
String endpointToUse = getEndpointName(endpointName);
Collection<Message<T>> messagesToUse = preProcessMessagesForSend(endpointToUse, messages);
return doSendBatchAsync(endpointToUse, convertMessagesToSend(messagesToUse), messagesToUse)
.exceptionallyCompose(t -> wrapSendException(messagesToUse, endpointToUse, t))
.thenCompose(result -> handleFailedMessages(endpointToUse, result))
.whenComplete((v, t) -> logSendMessageBatchResult(endpointToUse, messagesToUse, t));
return preProcessMessagesForSendAsync(endpointToUse, messages).thenCompose(
messagesToUse -> doSendBatchAsync(endpointToUse, convertMessagesToSend(messagesToUse), messagesToUse)
.exceptionallyCompose(t -> wrapSendException(messagesToUse, endpointToUse, t))
.thenCompose(result -> handleFailedMessages(endpointToUse, result))
.whenComplete((v, t) -> logSendMessageBatchResult(endpointToUse, messagesToUse, t)));
}

protected abstract <T> Collection<Message<T>> preProcessMessagesForSend(String endpointToUse,
Collection<Message<T>> messages);

protected <T> CompletableFuture<Collection<Message<T>>> preProcessMessagesForSendAsync(String endpointToUse,
Collection<Message<T>> messages) {
return CompletableFuture.completedFuture(preProcessMessagesForSend(endpointToUse, messages));
}

private <T> CompletableFuture<SendResult.Batch<T>> handleFailedMessages(String endpointToUse,
SendResult.Batch<T> result) {
return !result.failed().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
Expand All @@ -68,6 +67,14 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/**
* Sqs-specific implementation of {@link AbstractMessagingTemplate}
*
* @author Tomaz Fernandes
* @author Zhong Xi Lu
*
* @since 3.0
*/
public class SqsTemplate extends AbstractMessagingTemplate<Message> implements SqsOperations, SqsAsyncOperations {

private static final Logger logger = LoggerFactory.getLogger(SqsTemplate.class);
Expand All @@ -86,6 +93,8 @@ public class SqsTemplate extends AbstractMessagingTemplate<Message> implements S

private final Collection<String> messageSystemAttributeNames;

private final TemplateContentBasedDeduplication contentBasedDeduplication;

private SqsTemplate(SqsTemplateBuilderImpl builder) {
super(builder.messageConverter, builder.options);
SqsTemplateOptionsImpl options = builder.options;
Expand All @@ -94,6 +103,7 @@ private SqsTemplate(SqsTemplateBuilderImpl builder) {
this.queueAttributeNames = options.queueAttributeNames;
this.queueNotFoundStrategy = options.queueNotFoundStrategy;
this.messageSystemAttributeNames = options.messageSystemAttributeNames;
this.contentBasedDeduplication = options.contentBasedDeduplication;
}

/**
Expand Down Expand Up @@ -127,6 +137,7 @@ public static SqsOperations newSyncTemplate(SqsAsyncClient sqsAsyncClient) {
/**
* Create a new {@link SqsTemplate} instance with the provided {@link SqsAsyncClient}, only exposing the async
* methods contained in {@link SqsAsyncOperations}.
*
* @param sqsAsyncClient the client.
* @return the new template instance.
*/
Expand Down Expand Up @@ -247,35 +258,61 @@ private Map<String, Object> addAdditionalReceiveHeaders(SqsReceiveOptionsImpl op
@Override
protected <T> org.springframework.messaging.Message<T> preProcessMessageForSend(String endpointToUse,
org.springframework.messaging.Message<T> message) {
return FifoUtils.isFifo(endpointToUse) ? addMissingFifoSendHeaders(endpointToUse, message) : message;
return message;
}

@Override
protected <T> Collection<org.springframework.messaging.Message<T>> preProcessMessagesForSend(String endpointToUse,
Collection<org.springframework.messaging.Message<T>> messages) {
return messages;
}

@Override
protected <T> CompletableFuture<org.springframework.messaging.Message<T>> preProcessMessageForSendAsync(
String endpointToUse, org.springframework.messaging.Message<T> message) {
return FifoUtils.isFifo(endpointToUse)
? messages.stream().map(message -> addMissingFifoSendHeaders(endpointToUse, message)).toList()
: messages;
? endpointHasContentBasedDeduplicationEnabled(endpointToUse)
.thenApply(enabled -> enabled ? addMissingFifoSendHeaders(message, Map.of())
: addMissingFifoSendHeaders(message, getRandomDeduplicationIdHeader()))
: CompletableFuture.completedFuture(message);
}

private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(String endpointName,
org.springframework.messaging.Message<T> message) {
@Override
protected <T> CompletableFuture<Collection<org.springframework.messaging.Message<T>>> preProcessMessagesForSendAsync(
String endpointToUse, Collection<org.springframework.messaging.Message<T>> messages) {
return FifoUtils.isFifo(endpointToUse)
? endpointHasContentBasedDeduplicationEnabled(endpointToUse).thenApply(enabled -> messages.stream()
.map(message -> enabled ? addMissingFifoSendHeaders(message, Map.of())
: addMissingFifoSendHeaders(message, getRandomDeduplicationIdHeader()))
.toList())
: CompletableFuture.completedFuture(messages);
}

Set<QueueAttributeName> additionalAttributes = Set.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION);
String contentBasedDedupQueueAttribute = getQueueAttributes(endpointName, additionalAttributes).join()
.getQueueAttribute(QueueAttributeName.CONTENT_BASED_DEDUPLICATION);
private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(
org.springframework.messaging.Message<T> message, Map<String, Object> additionalHeaders) {
return MessageHeaderUtils.addHeadersIfAbsent(message,
Stream.concat(additionalHeaders.entrySet().stream(), getRandomMessageGroupIdHeader().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

boolean isContentBasedDedup = Boolean.parseBoolean(contentBasedDedupQueueAttribute);
Map<String, Object> defaultHeaders;
if (isContentBasedDedup) {
defaultHeaders = Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString());
}
else {
defaultHeaders = Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString(),
MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
}
private Map<String, String> getRandomMessageGroupIdHeader() {
return Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString());
}

private Map<String, Object> getRandomDeduplicationIdHeader() {
return Map.of(MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
}

private CompletableFuture<Boolean> endpointHasContentBasedDeduplicationEnabled(String endpointName) {
return TemplateContentBasedDeduplication.AUTO.equals(this.contentBasedDeduplication)
? handleAutoDeduplication(endpointName)
: CompletableFuture
.completedFuture(contentBasedDeduplication.equals(TemplateContentBasedDeduplication.ENABLED));
}

return MessageHeaderUtils.addHeadersIfAbsent(message, defaultHeaders);
private CompletableFuture<Boolean> handleAutoDeduplication(String endpointName) {
return getQueueAttributes(endpointName).thenApply(attributes -> Boolean
.parseBoolean(attributes.getQueueAttribute(QueueAttributeName.CONTENT_BASED_DEDUPLICATION)));
}

@Override
Expand Down Expand Up @@ -363,16 +400,20 @@ private <T> SqsMessageConversionContext doGetSqsMessageConversionContext(String
SqsMessageConversionContext conversionContext = new SqsMessageConversionContext();
conversionContext.setSqsAsyncClient(this.sqsAsyncClient);
// At this point we'll already have retrieved and cached the queue attributes
CompletableFuture<QueueAttributes> queueAttributes = getQueueAttributes(newEndpoint);
Assert.isTrue(queueAttributes.isDone(), () -> "Queue attributes not done for " + newEndpoint);
conversionContext.setQueueAttributes(queueAttributes.join());
conversionContext.setQueueAttributes(getAttributesImmediately(newEndpoint));
if (payloadClass != null) {
conversionContext.setPayloadClass(payloadClass);
}
conversionContext.setAcknowledgementCallback(new TemplateAcknowledgementCallback<T>());
return conversionContext;
}

private QueueAttributes getAttributesImmediately(String newEndpoint) {
CompletableFuture<QueueAttributes> queueAttributes = getQueueAttributes(newEndpoint);
Assert.isTrue(queueAttributes.isDone(), () -> "Queue attributes not done for " + newEndpoint);
return queueAttributes.join();
}

private CompletableFuture<SendMessageBatchRequest> createSendMessageBatchRequest(String endpointName,
Collection<Message> messages) {
return getQueueAttributes(endpointName)
Expand Down Expand Up @@ -423,21 +464,23 @@ private boolean isSkipAttribute(MessageSystemAttributeName name) {
}

private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName) {
return getQueueAttributes(endpointName, Collections.emptySet());
return this.queueAttributesCache.computeIfAbsent(endpointName,
newName -> doGetQueueAttributes(endpointName, newName));
}

private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName,
Set<QueueAttributeName> additionalAttributes) {
return this.queueAttributesCache.computeIfAbsent(endpointName, newName -> {
// ensure we have the content based dedupe config to determine default fifo send headers
Set<QueueAttributeName> namesToRequest = new HashSet<>(queueAttributeNames);
if (additionalAttributes != null && !additionalAttributes.isEmpty()) {
namesToRequest.addAll(additionalAttributes);
}
return QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName)
.queueNotFoundStrategy(this.queueNotFoundStrategy).queueAttributeNames(namesToRequest).build()
.resolveQueueAttributes();
});
private CompletableFuture<QueueAttributes> doGetQueueAttributes(String endpointName, String newName) {
return QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName)
.queueNotFoundStrategy(this.queueNotFoundStrategy)
.queueAttributeNames(maybeAddContentBasedDeduplicationAttribute(endpointName)).build()
.resolveQueueAttributes();
}

private Collection<QueueAttributeName> maybeAddContentBasedDeduplicationAttribute(String endpointName) {
return FifoUtils.isFifo(endpointName)
&& TemplateContentBasedDeduplication.AUTO.equals(this.contentBasedDeduplication)
? Stream.concat(queueAttributeNames.stream(),
Stream.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION)).toList()
: queueAttributeNames;
}

@Override
Expand Down Expand Up @@ -591,6 +634,8 @@ private static class SqsTemplateOptionsImpl extends AbstractMessagingTemplateOpt

private Collection<String> messageSystemAttributeNames = Collections.singletonList("All");

private TemplateContentBasedDeduplication contentBasedDeduplication = TemplateContentBasedDeduplication.AUTO;

@Override
public SqsTemplateOptions queueAttributeNames(Collection<QueueAttributeName> queueAttributeNames) {
Assert.notEmpty(queueAttributeNames, "queueAttributeNames cannot be null or empty");
Expand Down Expand Up @@ -625,6 +670,13 @@ public SqsTemplateOptions messageSystemAttributeNames(
return this;
}

@Override
public SqsTemplateOptions contentBasedDeduplication(
TemplateContentBasedDeduplication contentBasedDeduplication) {
this.contentBasedDeduplication = contentBasedDeduplication;
return this;
}

}

private static class SqsTemplateBuilderImpl implements SqsTemplateBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,13 @@ public interface SqsTemplateOptions extends MessagingTemplateOptions<SqsTemplate
*/
SqsTemplateOptions messageSystemAttributeNames(Collection<MessageSystemAttributeName> messageSystemAttributeNames);

/**
* Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to. By
* default, this is set to AUTO and the queue attribute value will be resolved automatically per queue. If set to
* ENABLED or DISABLED, the value will apply to all queues.
*
* @param contentBasedDeduplication the ContentBasedDeduplication value.
* @return the options instance.
*/
SqsTemplateOptions contentBasedDeduplication(TemplateContentBasedDeduplication contentBasedDeduplication);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.operations;

/**
* The ContentBasedDeduplication queue attribute value to be used by the {@link SqsTemplate} when sending messages to a
* FIFO queue.
*
* @author Zhong Xi Lu
* @since 3.0.4
*/
public enum TemplateContentBasedDeduplication {

/**
* The ContentBasedDeduplication queue attribute value will be resolved automatically at runtime.
*/
AUTO,

/**
* ContentBasedDeduplication is enabled on all FIFO SQS queues.
*/
ENABLED,

/**
* ContentBasedDeduplication is disabled on all FIFO SQS queues.
*/
DISABLED
}
Loading
Loading