Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-67277: Add ability to exclude headers on consumers #279

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Solace Spring Cloud Build</name>
Expand All @@ -32,7 +32,7 @@
<!-- Remove this if the next version of solace-spring-boot works fine -->
<spring.boot.version>3.1.5</spring.boot.version>

<solace.spring.cloud.stream-starter.version>4.1.1-SNAPSHOT</solace.spring.cloud.stream-starter.version>
<solace.spring.cloud.stream-starter.version>4.2.0-SNAPSHOT</solace.spring.cloud.stream-starter.version>

<solace.integration.test.support.version>1.0.2</solace.integration.test.support.version>
<solace.integration.test.support.fetch_checkout.skip>false</solace.integration.test.support.fetch_checkout.skip>
Expand Down
7 changes: 4 additions & 3 deletions solace-spring-cloud-bom/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Consult the table below to determine which version of the BOM you need to use:
| 2021.0.6 | 2.5.0 | 2.7.x |
| 2022.0.2 | 3.0.0 | 3.0.x |
| 2022.0.4 | 3.1.0 | 3.1.x |
| 2022.0.4 | 3.2.0 | 3.1.x |

## Including the BOM

Expand All @@ -38,7 +39,7 @@ In addition to showing how to include the BOM, the following snippets also shows
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -66,7 +67,7 @@ apply plugin: 'io.spring.dependency-management'

dependencyManagement {
imports {
mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:3.1.0"
mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:3.2.0"
}
}

Expand All @@ -78,7 +79,7 @@ dependencies {
### Using it with Gradle 5
```groovy
dependencies {
implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:3.1.0"))
implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:3.2.0"))
implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace")
}
```
4 changes: 2 additions & 2 deletions solace-spring-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>solace-spring-cloud-bom</artifactId>
<packaging>pom</packaging>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>

<name>Solace Spring Cloud BOM</name>
<description>BOM for Solace Spring Cloud</description>
Expand Down
2 changes: 1 addition & 1 deletion solace-spring-cloud-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Spring Cloud Stream Binder for Solace PubSub+
:revnumber: 4.1.0
:revnumber: 4.2.0
:toc: preamble
:toclevels: 3
:icons: font
Expand Down Expand Up @@ -346,6 +346,11 @@ The number of milliseconds before republished messages are discarded or moved to
+
Default: `null`

headerExclusions::
The list of headers to exclude when converting consumed Solace message to Spring message.
+
Default: Empty `List&lt;String&gt;`

==== Solace Producer Properties

The following properties are available for Solace producers only and must be prefixed with `spring.cloud.stream.solace.bindings.&lt;bindingName&gt;.producer.` where `bindingName` looks something like `functionName-out-0` as defined in https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_functional_binding_names[Functional Binding Names].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
<packaging>jar</packaging>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-stream-binder-solace-core</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Solace Spring Cloud Stream Binder Core</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ private void processBatchIfAvailable() {

Message<?> createOneMessage(BytesXMLMessage bytesXMLMessage, AcknowledgmentCallback acknowledgmentCallback) {
setAttributesIfNecessary(bytesXMLMessage, acknowledgmentCallback);
return xmlMessageMapper.map(bytesXMLMessage, acknowledgmentCallback);
return xmlMessageMapper.map(bytesXMLMessage, acknowledgmentCallback, consumerProperties.getExtension());
}

Message<?> createBatchMessage(List<BytesXMLMessage> bytesXMLMessages,
AcknowledgmentCallback acknowledgmentCallback) {
setAttributesIfNecessary(bytesXMLMessages, acknowledgmentCallback);
return xmlMessageMapper.mapBatchMessage(bytesXMLMessages, acknowledgmentCallback);
return xmlMessageMapper.mapBatchMessage(bytesXMLMessages, acknowledgmentCallback, consumerProperties.getExtension());
}

void sendOneToConsumer(final Message<?> message, final BytesXMLMessage bytesXMLMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected Object doReceive() {
private Message<?> processMessage(MessageContainer messageContainer) {
AcknowledgmentCallback acknowledgmentCallback = ackCallbackFactory.createCallback(messageContainer);
try {
return xmlMessageMapper.map(messageContainer.getMessage(), acknowledgmentCallback, true);
return xmlMessageMapper.map(messageContainer.getMessage(), acknowledgmentCallback, true, consumerProperties.getExtension());
} catch (Exception e) {
//TODO If one day the errorChannel or attributesHolder can be retrieved, use those instead
logger.warn(e, String.format("XMLMessage %s cannot be consumed. It will be rejected",
Expand All @@ -188,7 +188,7 @@ private Message<List<?>> processBatchIfAvailable() {
return xmlMessageMapper.mapBatchMessage(batchedMessages.get()
.stream()
.map(MessageContainer::getMessage)
.collect(Collectors.toList()), acknowledgmentCallback, true);
.collect(Collectors.toList()), acknowledgmentCallback, true, consumerProperties.getExtension());
} catch (Exception e) {
logger.warn(e, "Message batch cannot be consumed. It will be rejected");
AckUtils.reject(acknowledgmentCallback);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.solace.spring.cloud.stream.binder.properties;

import com.solacesystems.jcsmp.EndpointProperties;
import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -133,6 +135,10 @@ public class SolaceConsumerProperties extends SolaceCommonProperties {
private Long errorMsgTtl = null;
// ------------------------

/**
* The list of headers to exclude when converting consumed Solace message to Spring message.
*/
private List<String> headerExclusions = new ArrayList<>();

public int getBatchMaxSize() {
return batchMaxSize;
Expand Down Expand Up @@ -317,4 +323,12 @@ public Long getErrorMsgTtl() {
public void setErrorMsgTtl(Long errorMsgTtl) {
this.errorMsgTtl = errorMsgTtl;
}

public List<String> getHeaderExclusions() {
return headerExclusions;
}

public void setHeaderExclusions(List<String> headerExclusions) {
this.headerExclusions = headerExclusions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,18 @@ public XMLMessage map(Message<?> message, Collection<String> excludedHeaders, bo
}

public Message<List<?>> mapBatchMessage(List<? extends XMLMessage> xmlMessages,
AcknowledgmentCallback acknowledgmentCallback)
AcknowledgmentCallback acknowledgmentCallback, SolaceConsumerProperties solaceConsumerProperties)
throws SolaceMessageConversionException {
return mapBatchMessage(xmlMessages, acknowledgmentCallback, false);
return mapBatchMessage(xmlMessages, acknowledgmentCallback, false, solaceConsumerProperties);
}

public Message<List<?>> mapBatchMessage(List<? extends XMLMessage> xmlMessages,
AcknowledgmentCallback acknowledgmentCallback,
boolean setRawMessageHeader) throws SolaceMessageConversionException {
boolean setRawMessageHeader, SolaceConsumerProperties solaceConsumerProperties) throws SolaceMessageConversionException {
List<Map<String, Object>> batchedHeaders = new ArrayList<>();
List<Object> batchedPayloads = new ArrayList<>();
for (XMLMessage xmlMessage : xmlMessages) {
Message<?> message = mapInternal(xmlMessage).build();
Message<?> message = mapInternal(xmlMessage, solaceConsumerProperties).build();
batchedHeaders.add(message.getHeaders());
batchedPayloads.add(message.getPayload());
}
Expand All @@ -186,20 +186,21 @@ public Message<List<?>> mapBatchMessage(List<? extends XMLMessage> xmlMessages,
.build();
}

public Message<?> map(XMLMessage xmlMessage, AcknowledgmentCallback acknowledgmentCallback)
public Message<?> map(XMLMessage xmlMessage, AcknowledgmentCallback acknowledgmentCallback, SolaceConsumerProperties solaceConsumerProperties)
throws SolaceMessageConversionException {
return map(xmlMessage, acknowledgmentCallback, false);
return map(xmlMessage, acknowledgmentCallback, false, solaceConsumerProperties);
}

public Message<?> map(XMLMessage xmlMessage, AcknowledgmentCallback acknowledgmentCallback,
boolean setRawMessageHeader) throws SolaceMessageConversionException {
return injectRootMessageHeaders(mapInternal(xmlMessage), acknowledgmentCallback, setRawMessageHeader ?
boolean setRawMessageHeader, SolaceConsumerProperties solaceConsumerProperties) throws SolaceMessageConversionException {
return injectRootMessageHeaders(mapInternal(xmlMessage, solaceConsumerProperties), acknowledgmentCallback, setRawMessageHeader ?
xmlMessage : null).build();
}

private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage)
private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage, SolaceConsumerProperties solaceConsumerProperties)
throws SolaceMessageConversionException {
SDTMap metadata = xmlMessage.getProperties();
List<String> excludedHeaders = solaceConsumerProperties.getHeaderExclusions();

Object payload;
if (xmlMessage instanceof BytesMessage) {
Expand Down Expand Up @@ -247,7 +248,7 @@ private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage)

AbstractIntegrationMessageBuilder<?> builder = MESSAGE_BUILDER_FACTORY
.withPayload(payload)
.copyHeaders(map(metadata))
.copyHeaders(map(metadata, excludedHeaders))
.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, xmlMessage.getHTTPContentType());

if (isNullPayload) {
Expand All @@ -261,6 +262,9 @@ private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage)
if (!header.getValue().isReadable()) {
continue;
}
if (excludedHeaders != null && excludedHeaders.contains(header.getKey())) {
continue;
}
if (ignoredHeaderProperties.contains(header.getKey())) {
continue;
}
Expand Down Expand Up @@ -320,15 +324,19 @@ SDTMap map(MessageHeaders headers, Collection<String> excludedHeaders, boolean c
return metadata;
}

MessageHeaders map(SDTMap metadata) {
MessageHeaders map(SDTMap metadata, Collection<String> excludedHeaders) {
if (metadata == null) {
return new MessageHeaders(Collections.emptyMap());
}

final Collection<String> exclusionList =
excludedHeaders != null ? excludedHeaders : Collections.emptyList();

Map<String,Object> headers = new HashMap<>();

// Deserialize headers
if (metadata.containsKey(SolaceBinderHeaders.SERIALIZED_HEADERS)) {
if (!exclusionList.contains(SolaceBinderHeaders.SERIALIZED_HEADERS) &&
metadata.containsKey(SolaceBinderHeaders.SERIALIZED_HEADERS)) {
Encoder encoder = null;
if (metadata.containsKey(SolaceBinderHeaders.SERIALIZED_HEADERS_ENCODING)) {
String encoding = rethrowableCall(metadata::getString, SolaceBinderHeaders.SERIALIZED_HEADERS_ENCODING);
Expand All @@ -345,7 +353,9 @@ MessageHeaders map(SDTMap metadata) {
rethrowableCall(metadata::getString, SolaceBinderHeaders.SERIALIZED_HEADERS));

for (String headerName : serializedHeaders) {
if (metadata.containsKey(headerName)) {
if (exclusionList.contains(headerName)) {
continue;
} else if (metadata.containsKey(headerName)) {
byte[] serializedValue = encoder != null ?
encoder.decode(rethrowableCall(metadata::getString, headerName)) :
rethrowableCall(metadata::getBytes, headerName);
Expand All @@ -359,6 +369,7 @@ MessageHeaders map(SDTMap metadata) {
}

metadata.keySet().stream()
.filter(h -> !exclusionList.contains(h))
.filter(h -> !headers.containsKey(h))
.filter(h -> !SolaceBinderHeaderMeta.META.containsKey(h))
.filter(h -> !SolaceHeaderMeta.META.containsKey(h))
Expand All @@ -370,7 +381,8 @@ MessageHeaders map(SDTMap metadata) {
headers.put(h, value);
});

if (metadata.containsKey(SolaceBinderHeaders.MESSAGE_VERSION)) {
if (!exclusionList.contains(SolaceBinderHeaders.MESSAGE_VERSION) &&
metadata.containsKey(SolaceBinderHeaders.MESSAGE_VERSION)) {
int messageVersion = rethrowableCall(metadata::getInteger, SolaceBinderHeaders.MESSAGE_VERSION);
headers.put(SolaceBinderHeaders.MESSAGE_VERSION, messageVersion);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.solace.spring.cloud.stream.binder.properties;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class SolaceConsumerPropertiesTest {
@ParameterizedTest
Expand Down Expand Up @@ -81,4 +83,9 @@ public void testFailSetFlowRebindBackOffMultiplier(double multiplier) {
assertThrows(IllegalArgumentException.class, () -> new SolaceConsumerProperties()
.setFlowRebindBackOffMultiplier(multiplier));
}

@Test
void testDefaultHeaderExclusionsListIsEmpty() {
assertTrue(new SolaceConsumerProperties().getHeaderExclusions().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.solace.spring.cloud.stream.binder.messaging.HeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.test.util.SerializableFoo;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import com.solacesystems.jcsmp.BytesMessage;
Expand Down Expand Up @@ -339,6 +340,7 @@ public void testPayloadFromJmsToSpring(JCSMPSession jcsmpSession, SoftAssertions
messages.add(mapMessage);
}

SolaceConsumerProperties consumerProperties = new SolaceConsumerProperties();
XMLMessageConsumer messageConsumer = null;
try {
Set<Class<? extends XMLMessage>> processedMessageTypes = new HashSet<>();
Expand All @@ -349,7 +351,7 @@ public void testPayloadFromJmsToSpring(JCSMPSession jcsmpSession, SoftAssertions
public void onReceive(BytesXMLMessage bytesXMLMessage) {
logger.info("Got message " + bytesXMLMessage);
try {
Message<?> msg = xmlMessageMapper.map(bytesXMLMessage, null);
Message<?> msg = xmlMessageMapper.map(bytesXMLMessage, null, consumerProperties);
if (msg.getPayload() instanceof byte[]) {
softly.assertThat(msg.getPayload()).isEqualTo("test".getBytes());
processedMessageTypes.add(BytesMessage.class);
Expand Down Expand Up @@ -428,6 +430,7 @@ public void testSerializedPayloadFromJmsToSpring(JCSMPSession jcsmpSession, Soft
ObjectMessage message = jmsSession.createObjectMessage(payload);
message.setBooleanProperty(SolaceBinderHeaders.SERIALIZED_PAYLOAD, true);

SolaceConsumerProperties consumerProperties = new SolaceConsumerProperties();
XMLMessageConsumer messageConsumer = null;
try {
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
Expand All @@ -437,7 +440,7 @@ public void testSerializedPayloadFromJmsToSpring(JCSMPSession jcsmpSession, Soft
public void onReceive(BytesXMLMessage bytesXMLMessage) {
logger.info("Got message " + bytesXMLMessage);
try {
softly.assertThat(xmlMessageMapper.map(bytesXMLMessage, null).getPayload())
softly.assertThat(xmlMessageMapper.map(bytesXMLMessage, null, consumerProperties).getPayload())
.isEqualTo(payload);
} catch (Exception e) {
exceptionAtomicReference.set(e);
Expand Down
Loading
Loading