diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index b7bdb0d7..a45a7075 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -52,15 +52,12 @@ jobs: **/target/failsafe-reports/*.xml **/target/surefire-reports/*.xml - name: Publish Unit Test Results - if: (success() || failure()) && (github.actor != 'dependabot[bot]' || (github.event_name == 'push' && !contains(github.ref, 'dependabot'))) - uses: EnricoMi/publish-unit-test-result-action@v1 + if: ${{ !cancelled() && (github.actor != 'dependabot[bot]' || (github.event_name == 'push' && !contains(github.ref, 'dependabot'))) }} + uses: EnricoMi/publish-unit-test-result-action@v2 continue-on-error: true with: - check_name: Unit Test Results - comment_mode: create new fail_on: nothing - hide_comments: orphaned commits - files: | + junit_files: | **/target/failsafe-reports/*.xml !**/target/failsafe-reports/failsafe-summary.xml **/target/surefire-reports/*.xml diff --git a/README.md b/README.md index 7fd94343..18ac1f74 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ TEST_SOLACE_MGMT_PASSWORD=admin #### Parallel Test Execution -Parallel test execution is enabled by default. Add the `-Djunit.jupiter.execution.parallel.enabled=false` option to your command to disable parallel test execution. +Parallel test execution is disabled by default. Add the `-Djunit.jupiter.execution.parallel.enabled=true` option to your command to enable parallel test execution. ## Release Process diff --git a/pom.xml b/pom.xml index 6e1e6ee0..c9d90f17 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.boot solace-spring-boot-bom - 1.2.2 + 1.3.0 com.solace.spring.cloud solace-spring-cloud-build - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT pom Solace Spring Cloud Build @@ -21,7 +21,7 @@ SolaceProducts - 2021.0.4 + 2021.0.6 2.2.13.RELEASE @@ -29,14 +29,14 @@ - 1.2.2 + 1.3.0 - 2.7.3 + 2.7.11 - 4.3.12-SNAPSHOT - 3.4.3-SNAPSHOT + 4.3.10-SNAPSHOT + 3.5.0-SNAPSHOT 0.9.1 false @@ -376,4 +376,4 @@ - \ No newline at end of file + diff --git a/solace-spring-cloud-bom/README.md b/solace-spring-cloud-bom/README.md index 7f6d4474..7a1b01d9 100644 --- a/solace-spring-cloud-bom/README.md +++ b/solace-spring-cloud-bom/README.md @@ -23,6 +23,7 @@ Consult the table below to determine which version of the BOM you need to use: | 2020.0.1 | 2.0.0, 2.1.0, 2.2.0, 2.2.1 | 2.4.x | | 2021.0.1 | 2.3.0, 2.3.1, 2.3.2 | 2.6.x | | 2021.0.4 | 2.4.0 | 2.7.x | +| 2021.0.6 | 2.5.0 | 2.7.x | ## Including the BOM @@ -35,7 +36,7 @@ In addition to showing how to include the BOM, the following snippets also shows com.solace.spring.cloud solace-spring-cloud-bom - 2.4.0 + 2.5.0 pom import @@ -63,7 +64,7 @@ apply plugin: 'io.spring.dependency-management' dependencyManagement { imports { - mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:2.4.0" + mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:2.5.0" } } @@ -75,7 +76,7 @@ dependencies { ### Using it with Gradle 5 ```groovy dependencies { - implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:2.4.0")) + implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:2.5.0")) implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace") } ``` diff --git a/solace-spring-cloud-bom/pom.xml b/solace-spring-cloud-bom/pom.xml index eded4900..b543b0ac 100644 --- a/solace-spring-cloud-bom/pom.xml +++ b/solace-spring-cloud-bom/pom.xml @@ -5,13 +5,13 @@ com.solace.spring.cloud solace-spring-cloud-build - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT ../pom.xml solace-spring-cloud-bom pom - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT Solace Spring Cloud BOM BOM for Solace Spring Cloud @@ -53,4 +53,4 @@ - \ No newline at end of file + diff --git a/solace-spring-cloud-connector/README.md b/solace-spring-cloud-connector/README.md index 217b9c69..edf52d13 100644 --- a/solace-spring-cloud-connector/README.md +++ b/solace-spring-cloud-connector/README.md @@ -76,7 +76,7 @@ Include version 4.0.0 or later to use Spring Boot release 2.x ``` // Solace Cloud -compile("com.solace.cloud.cloudfoundry:solace-spring-cloud-connector:4.3.9") +compile("com.solace.cloud.cloudfoundry:solace-spring-cloud-connector:4.3.10") ``` ### Using it with Maven @@ -86,7 +86,7 @@ compile("com.solace.cloud.cloudfoundry:solace-spring-cloud-connector:4.3.9") com.solace.cloud.cloudfoundry solace-spring-cloud-connector - 4.3.9 + 4.3.10 ``` diff --git a/solace-spring-cloud-connector/pom.xml b/solace-spring-cloud-connector/pom.xml index 88499376..9b24d4fd 100644 --- a/solace-spring-cloud-connector/pom.xml +++ b/solace-spring-cloud-connector/pom.xml @@ -21,7 +21,7 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT ../solace-spring-cloud-parent/pom.xml @@ -29,7 +29,7 @@ com.solace.cloud.cloudfoundry solace-spring-cloud-connector jar - 4.3.12-SNAPSHOT + 4.3.10-SNAPSHOT Spring Cloud Connectors for Solace PubSub+ on Cloud Foundry Spring Cloud Connectors for Solace PubSub+ on Cloud Foundry. https://github.com/${repoName}/solace-spring-cloud/tree/${project.scm.tag}/solace-spring-cloud-connector diff --git a/solace-spring-cloud-parent/pom.xml b/solace-spring-cloud-parent/pom.xml index 510715b1..55b296dd 100644 --- a/solace-spring-cloud-parent/pom.xml +++ b/solace-spring-cloud-parent/pom.xml @@ -5,7 +5,7 @@ com.solace.spring.cloud solace-spring-cloud-build - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT ../pom.xml @@ -23,8 +23,8 @@ 1.8 1.8 - 10.16.0 - 10.16.0 + 10.19.0 + 10.19.0 @@ -36,7 +36,7 @@ org.apache.logging.log4j log4j-bom - 2.19.0 + 2.20.0 pom import @@ -96,7 +96,7 @@ - junit.jupiter.execution.parallel.enabled=true + junit.jupiter.execution.parallel.enabled=false @@ -116,7 +116,7 @@ - junit.jupiter.execution.parallel.enabled=true + junit.jupiter.execution.parallel.enabled=false @@ -126,4 +126,4 @@ - \ No newline at end of file + diff --git a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc index 034802f6..542f2f29 100644 --- a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc +++ b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc @@ -1,9 +1,9 @@ = Spring Cloud Stream Binder for Solace PubSub+ -:revnumber: 3.4.0 +:revnumber: 3.5.0 :toc: preamble :toclevels: 3 :icons: font -:scst-version: 3.2.5 +:scst-version: 3.2.7 // Github-Specific Settings ifdef::env-github[] @@ -413,6 +413,15 @@ The following properties are available for Solace producers only and must be pre See link:../../solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceCommonProperties.java[SolaceCommonProperties] and link:../../solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceProducerProperties.java[SolaceProducerProperties] for the most updated list. +destinationType:: +Specifies whether the configured `destination` is a `topic` or a `queue`. ++ +When set to `topic`, the `destination` name is a topic subscription added on a queue. ++ +When set to `queue`, the producer binds to a queue matching the `destination` name. The queue can be auto-provisioned with `provisionDurableQueue=true` however, all naming prefix and queue name generation options do not apply. A queue will be provisioned using the `destination` name explicitly. ++ +Default: `topic` + headerExclusions:: The list of headers to exclude from the published message. Excluding Solace message headers is not supported. + @@ -426,7 +435,7 @@ Default: `false` IMPORTANT: Non-serializable headers should have a meaningful `toString()` implementation. Otherwise enabling this feature may result in potential data loss. provisionDurableQueue:: -Whether to provision durable queues for non-anonymous consumer groups. This should only be set to `false` if you have externally pre-provisioned the required queue on the message broker. +Whether to provision durable queues for non-anonymous consumer groups or queue destinations. This should only be set to `false` if you have externally pre-provisioned the required queue on the message broker. + Default: `true` + See: <> @@ -435,6 +444,8 @@ addDestinationAsSubscriptionToQueue:: Whether to add the Destination as a subscription to queue during provisioning. + Default: `true` ++ +NOTE: Does not apply when `destinationType=queue`. provisionSubscriptionsToDurableQueue:: Whether to add topic subscriptions to durable queues for non-anonymous consumer groups. This should only be set to `false` if you have externally pre-added the required topic subscriptions (the destination topic should be added at minimum) on the consumer group's queue on the message broker. This property also applies to topics added by the `queueAdditionalSubscriptions` property. @@ -490,39 +501,39 @@ See: <> WARNING: **Deprecated:** Since version 3.3.0, this property is deprecated in favor of `queueNameExpression` and `queueNameExpressionsForRequiredGroups`. The destination encoding can be removed from queue names by removing it directly from these SpEL expressions. queueAccessType:: -Access type for the required consumer group queue. +Access type for binder provisioned queues. + Default: `0` (ACCESSTYPE_NONEXCLUSIVE) + See: https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/constant-values.html#com.solacesystems.jcsmp.EndpointProperties.ACCESSTYPE_EXCLUSIVE[The `ACCESSTYPE_` prefixed constants for other possible values] queuePermission:: -Permissions for the required consumer group queue. +Permissions for binder provisioned queues. + Default: `2` (PERMISSION_CONSUME) + -See: https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/constant-values.html#com.solacesystems.jcsmp.EndpointProperties.ACCESSTYPE_EXCLUSIVE[The `PERMISSION_` prefixed constants for other possible values] +See: https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/constant-values.html#com.solacesystems.jcsmp.EndpointProperties.PERMISSION_CONSUME[The `PERMISSION_` prefixed constants for other possible values] queueDiscardBehaviour:: -If specified, whether to notify sender if a message fails to be enqueued to the required consumer group queue. +Queue discard behaviour for binder provisioned queues. Whether to notify sender if a message fails to be enqueued to the endpoint. A null value means use the appliance default. + Default: `null` queueMaxMsgRedelivery:: -Sets the maximum message redelivery count on the required consumer group queue. (Zero means retry forever). +Sets the maximum message redelivery count for binder provisioned queues. (Zero means retry forever). + Default: `null` queueMaxMsgSize:: -Maximum message size for the required consumer group queue. +Maximum message size for binder provisioned queues. + Default: `null` queueQuota:: -Message spool quota for the required consumer group queue. +Message spool quota for binder provisioned queues. + Default: `null` queueRespectsMsgTtl:: -Whether the required consumer group queue respects Message TTL. +Whether the binder provisioned queues respect Message TTL. + Default: `null` @@ -532,6 +543,8 @@ These subscriptions may also contain wildcards. + Default: Empty `Map<String,String[]>` + See: <> for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups. ++ +NOTE: Does not apply when `destinationType=queue`. === Solace Message Headers @@ -733,6 +746,22 @@ The consolidated list of message headers for a batch of messages where the heade | Internal Binder Use Only | "base64" | The encoding algorithm used to encode the headers indicated by `solace_scst_serializedHeaders`. + +| solace_scst_targetDestinationType +| String +| Write +| +| Only applicable when `scst_targetDestination` is set. + +*topic* + +Specifies that the dynamic destination is a topic + +*queue* + +Specifies that the dynamic destination is a queue + +When absent, the binding’s configured destination-type is used. |=== == Native Payload Types @@ -972,7 +1001,9 @@ This property can be configured for dynamically created queues by using https:// Spring Cloud Stream has a reserved message header called `scst_targetDestination` (retrievable via `BinderHeaders.TARGET_DESTINATION`), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header. -For this binder's implementation of this header, the target destination defines the _exact_ Solace topic to which a message will be sent. i.e. No post-processing is done for this header. +For this binder's implementation of this header, the target destination defines the _exact_ Solace topic or queue to which a message will be sent. i.e. No post-processing is done. + +This binder also adds a reserved message header called `solace_scst_targetDestinationType` (retrievable via `SolaceBinderHeaders.TARGET_DESTINATION_TYPE`), which allows to override the configured producer `destination-type`. [source,java] ---- @@ -980,13 +1011,15 @@ public class MyMessageBuilder { public Message buildMeAMessage() { return MessageBuilder.withPayload("payload") .setHeader(BinderHeaders.TARGET_DESTINATION, "some-dynamic-destination") // <1> + .setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, "topic") // <2> .build(); } } ---- -<1> This message will be sent to the `some-dynamic-destination` topic, ignoring the producer's configured destination +<1> This message will be sent to the `some-dynamic-destination` topic, ignoring the producer's configured destination. +<2> Optionally, the configured producer `destination-type` can be overridden. -NOTE: This header is cleared by the message's producer before it is sent off to the message broker. So you should attach the target destination to your message payload if you want to get that information on the consumer-side. +NOTE: Those 2 headers are cleared from the message before it is sent off to the message broker. So you should attach that information to your message payload if you want to get that information on the consumer-side. == Failed Consumer Message Error Handling diff --git a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml index 7462cbd9..45f1c2ce 100644 --- a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml +++ b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT ../../solace-spring-cloud-parent/pom.xml spring-cloud-starter-stream-solace - 3.4.3-SNAPSHOT + 3.5.0-SNAPSHOT jar diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml index eb822f47..4c73cf6e 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT ../../solace-spring-cloud-parent/pom.xml spring-cloud-stream-binder-solace-core - 3.4.3-SNAPSHOT + 3.5.0-SNAPSHOT jar Solace Spring Cloud Stream Binder Core diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListener.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListener.java index 4b2b1642..d0bd73f3 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListener.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListener.java @@ -15,6 +15,7 @@ import com.solacesystems.jcsmp.ClosedFacilityException; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPTransportException; +import com.solacesystems.jcsmp.StaleSessionException; import com.solacesystems.jcsmp.XMLMessage; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -99,6 +100,8 @@ public void run() { consumerDestination.getName()), e); } } + } catch (StaleSessionException e) { + logger.error("Session has lost connection", e); } catch (Throwable t) { logger.error(String.format("Received unexpected error while consuming from destination %s", consumerDestination.getName()), t); @@ -113,7 +116,7 @@ private boolean keepPolling() { return !stopFlag.get() && !remoteStopFlag.get(); } - private void receive() throws UnboundFlowReceiverContainerException { + private void receive() throws UnboundFlowReceiverContainerException, StaleSessionException { MessageContainer messageContainer; try { @@ -122,6 +125,8 @@ private void receive() throws UnboundFlowReceiverContainerException { } else { messageContainer = flowReceiverContainer.receive(); } + } catch (StaleSessionException e) { + throw e; } catch (JCSMPException e) { String msg = String.format("Received error while trying to read message from endpoint %s", flowReceiverContainer.getQueueName()); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java index 365b7340..65144160 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java @@ -15,7 +15,8 @@ public class SolaceBinderHeaderMeta implements HeaderMeta { {SolaceBinderHeaders.SERIALIZED_HEADERS_ENCODING, new SolaceBinderHeaderMeta<>(String.class, false, false, Scope.WIRE)}, {SolaceBinderHeaders.CONFIRM_CORRELATION, new SolaceBinderHeaderMeta<>(CorrelationData.class, false, false, Scope.LOCAL)}, {SolaceBinderHeaders.NULL_PAYLOAD, new SolaceBinderHeaderMeta<>(Boolean.class, true, false, Scope.LOCAL)}, - {SolaceBinderHeaders.BATCHED_HEADERS, new SolaceBinderHeaderMeta<>(List.class, true, false, Scope.LOCAL)} + {SolaceBinderHeaders.BATCHED_HEADERS, new SolaceBinderHeaderMeta<>(List.class, true, false, Scope.LOCAL)}, + {SolaceBinderHeaders.TARGET_DESTINATION_TYPE, new SolaceBinderHeaderMeta<>(String.class, false, false, Scope.LOCAL)} }).collect(Collectors.toMap(d -> (String) d[0], d -> (SolaceBinderHeaderMeta) d[1])); private final Class type; @@ -35,11 +36,17 @@ public Class getType() { return type; } + /** + * The readable property is only used by tests and doesn't necessarily reflect whether a header can be read by an application or not + */ @Override public boolean isReadable() { return readable; } + /** + * The writable property is only used by tests and doesn't necessarily reflect whether a header can be written by an application or not + */ @Override public boolean isWritable() { return writable; diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java index a7379831..d0d07782 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java @@ -84,4 +84,17 @@ public final class SolaceBinderHeaders { * batch of messages where the headers for each payload element is in this list’s corresponding index.

*/ public static final String BATCHED_HEADERS = PREFIX + "batchedHeaders"; + + /** + *

Acceptable Value Type: String

+ *

Access: Write

+ *
+ *

Only applicable when {@code scst_targetDestination} is set.

+ *
    + *
  • topic: Specifies that the dynamic destination is a topic
  • + *
  • queue: Specifies that the dynamic destination is a queue
  • + *
+ *

When absent, the binding’s configured destination-type is used.

+ */ + public static final String TARGET_DESTINATION_TYPE = PREFIX + "targetDestinationType"; } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java index 06b6b7dc..88ad37f9 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java @@ -5,9 +5,11 @@ import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties; import com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException; import com.solace.spring.cloud.stream.binder.util.CorrelationData; +import com.solace.spring.cloud.stream.binder.util.DestinationType; import com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey; import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager; import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper; +import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPSession; @@ -32,7 +34,8 @@ public class JCSMPOutboundMessageHandler implements MessageHandler, Lifecycle { private final String id = UUID.randomUUID().toString(); - private final Topic topic; + private final DestinationType configDestinationType; + private final Destination configDestination; private final JCSMPSession jcsmpSession; private final MessageChannel errorChannel; private final JCSMPSessionProducerManager producerManager; @@ -51,7 +54,10 @@ public JCSMPOutboundMessageHandler(ProducerDestination destination, JCSMPSessionProducerManager producerManager, ExtendedProducerProperties properties, @Nullable SolaceMeterAccessor solaceMeterAccessor) { - this.topic = JCSMPFactory.onlyInstance().createTopic(destination.getName()); + this.configDestinationType = properties.getExtension().getDestinationType(); + this.configDestination = configDestinationType == DestinationType.TOPIC ? + JCSMPFactory.onlyInstance().createTopic(destination.getName()) : + JCSMPFactory.onlyInstance().createQueue(destination.getName()); this.jcsmpSession = jcsmpSession; this.errorChannel = errorChannel; this.producerManager = producerManager; @@ -70,16 +76,9 @@ public void handleMessage(Message message) throws MessagingException { throw handleMessagingException(correlationKey, msg0, new ClosedChannelBindingException(msg1)); } - Topic targetTopic = topic; - - try { - String targetDestinationHeader = message.getHeaders().get(BinderHeaders.TARGET_DESTINATION, String.class); - if (StringUtils.hasText(targetDestinationHeader)) { - targetTopic = JCSMPFactory.onlyInstance().createTopic(targetDestinationHeader); - } - } catch (IllegalArgumentException e) { - throw handleMessagingException(correlationKey, - String.format("Unable to parse header %s", BinderHeaders.TARGET_DESTINATION), e); + Destination targetDestination = checkDynamicDestination(message, correlationKey); + if (targetDestination == null) { + targetDestination = configDestination; } try { @@ -98,11 +97,16 @@ public void handleMessage(Message message) throws MessagingException { correlationKey.setRawMessage(xmlMessage); xmlMessage.setCorrelationKey(correlationKey); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Publishing message to destination [ %s:%s ]", targetDestination instanceof Topic ? "TOPIC" : "QUEUE", targetDestination)); + } + try { - producer.send(xmlMessage, targetTopic); + producer.send(xmlMessage, targetDestination); } catch (JCSMPException e) { throw handleMessagingException(correlationKey, - String.format("Unable to send message to topic %s", targetTopic.getName()), e); + String.format("Unable to send message to destination %s %s", + targetDestination instanceof Topic ? "TOPIC" : "QUEUE", targetDestination.getName()), e); } finally { if (solaceMeterAccessor != null) { solaceMeterAccessor.recordMessage(properties.getBindingName(), xmlMessage); @@ -110,9 +114,41 @@ public void handleMessage(Message message) throws MessagingException { } } + private Destination checkDynamicDestination(Message message, ErrorChannelSendingCorrelationKey correlationKey) { + try { + String dynamicDestName; + String targetDestinationHeader = message.getHeaders().get(BinderHeaders.TARGET_DESTINATION, String.class); + if (StringUtils.hasText(targetDestinationHeader)) { + dynamicDestName = targetDestinationHeader.trim(); + } else { + return null; + } + + String targetDestinationTypeHeader = message.getHeaders().get(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, String.class); + if (StringUtils.hasText(targetDestinationTypeHeader)) { + targetDestinationTypeHeader = targetDestinationTypeHeader.trim().toUpperCase(); + if (targetDestinationTypeHeader.equals(DestinationType.TOPIC.name())) { + return JCSMPFactory.onlyInstance().createTopic(dynamicDestName); + } else if (targetDestinationTypeHeader.equals(DestinationType.QUEUE.name())) { + return JCSMPFactory.onlyInstance().createQueue(dynamicDestName); + } else { + throw new IllegalArgumentException(String.format("Incorrect value specified for header '%s'. Expected [ %s|%s ] but actual value is [ %s ]", + SolaceBinderHeaders.TARGET_DESTINATION_TYPE, DestinationType.TOPIC.name(), DestinationType.QUEUE.name(), targetDestinationTypeHeader)); + } + } + + //No dynamic destinationType present so use configured destinationType + return configDestinationType == DestinationType.TOPIC ? + JCSMPFactory.onlyInstance().createTopic(dynamicDestName) : + JCSMPFactory.onlyInstance().createQueue(dynamicDestName); + } catch (Exception e) { + throw handleMessagingException(correlationKey, "Unable to parse headers", e); + } + } + @Override public void start() { - logger.info(String.format("Creating producer to topic %s ", topic.getName(), id)); + logger.info(String.format("Creating producer to %s %s ", configDestinationType, configDestination.getName(), id)); if (isRunning()) { logger.warn(String.format("Nothing to do, message handler %s is already running", id)); return; @@ -132,7 +168,7 @@ public void start() { @Override public void stop() { if (!isRunning()) return; - logger.info(String.format("Stopping producer to topic %s ", topic.getName(), id)); + logger.info(String.format("Stopping producer to %s %s ", configDestinationType, configDestination.getName(), id)); producerManager.release(id); isRunning = false; } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceProducerProperties.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceProducerProperties.java index 7b78e9fc..bd2b06e7 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceProducerProperties.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceProducerProperties.java @@ -1,5 +1,6 @@ package com.solace.spring.cloud.stream.binder.properties; +import com.solace.spring.cloud.stream.binder.util.DestinationType; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.ArrayList; @@ -13,6 +14,11 @@ @ConfigurationProperties(DEFAULTS_PREFIX + ".producer") public class SolaceProducerProperties extends SolaceCommonProperties { + /** + * The type of destination messages are published to. + */ + private DestinationType destinationType = DestinationType.TOPIC; + /** * A SpEL expression for creating the consumer group’s queue name. * Modifying this can cause naming conflicts between the queue names of consumer groups. @@ -43,6 +49,14 @@ public class SolaceProducerProperties extends SolaceCommonProperties { */ private boolean nonserializableHeaderConvertToString = false; + public DestinationType getDestinationType() { + return destinationType; + } + + public void setDestinationType(DestinationType destinationType) { + this.destinationType = destinationType; + } + public String getQueueNameExpression() { return queueNameExpression; } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProducerDestination.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProducerDestination.java index ebe592b1..dad51d9d 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProducerDestination.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceProducerDestination.java @@ -3,26 +3,26 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; class SolaceProducerDestination implements ProducerDestination { - private String topicName; + private String destinationName; - SolaceProducerDestination(String topicName) { - this.topicName = topicName; + SolaceProducerDestination(String destinationName) { + this.destinationName = destinationName; } @Override public String getName() { - return topicName; + return destinationName; } @Override public String getNameForPartition(int partition) { - return topicName; + return destinationName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("SolaceProducerDestination{"); - sb.append("topicName='").append(topicName).append('\''); + sb.append("destinationName='").append(destinationName).append('\''); sb.append('}'); return sb.toString(); } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceQueueProvisioner.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceQueueProvisioner.java index ada3af63..c1b374c0 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceQueueProvisioner.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceQueueProvisioner.java @@ -1,6 +1,7 @@ package com.solace.spring.cloud.stream.binder.provisioning; import com.solace.spring.cloud.stream.binder.properties.SolaceCommonProperties; +import com.solace.spring.cloud.stream.binder.util.DestinationType; import com.solacesystems.jcsmp.ConsumerFlowProperties; import com.solacesystems.jcsmp.EndpointProperties; import com.solacesystems.jcsmp.InvalidOperationException; @@ -50,6 +51,14 @@ public ProducerDestination provisionProducerDestination(String name, "Provisioning will continue under the assumption that it is disabled..."); } + if (properties.getExtension().getDestinationType() == DestinationType.QUEUE) { + if (properties.getRequiredGroups() != null && properties.getRequiredGroups().length > 0) { + throw new ProvisioningException(String.format("Producer requiredGroups are not supported when destinationType=%s", DestinationType.QUEUE)); + } + provisionQueueIfRequired(name, properties); + return new SolaceProducerDestination(name); + } + String topicName = SolaceProvisioningUtil.getTopicName(name, properties.getExtension()); Set requiredGroups = new HashSet<>(Arrays.asList(properties.getRequiredGroups())); @@ -58,11 +67,7 @@ public ProducerDestination provisionProducerDestination(String name, for (String groupName : requiredGroups) { String queueName = SolaceProvisioningUtil.getQueueName(topicName, groupName, properties); logger.info(String.format("Creating durable queue %s for required consumer group %s", queueName, groupName)); - EndpointProperties endpointProperties = SolaceProvisioningUtil.getEndpointProperties(properties.getExtension()); - boolean doDurableQueueProvisioning = properties.getExtension().isProvisionDurableQueue(); - Queue queue = provisionQueue(queueName, true, endpointProperties, doDurableQueueProvisioning, - properties.isAutoStartup()); - + Queue queue = provisionQueueIfRequired(queueName, properties); addSubscriptionToQueue(queue, topicName, properties.getExtension(), true); for (String extraTopic : requiredGroupsExtraSubs.getOrDefault(groupName, new String[0])) { @@ -133,6 +138,13 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou errorQueueName, additionalSubscriptions); } + private Queue provisionQueueIfRequired(String queueName, ExtendedProducerProperties properties) { + EndpointProperties endpointProperties = SolaceProvisioningUtil.getEndpointProperties(properties.getExtension()); + boolean doDurableQueueProvisioning = properties.getExtension().isProvisionDurableQueue(); + return provisionQueue(queueName, true, endpointProperties, doDurableQueueProvisioning, + properties.isAutoStartup()); + } + private Queue provisionQueue(String name, boolean isDurable, EndpointProperties endpointProperties, boolean doDurableProvisioning, boolean testFlowCxn) { return provisionQueue(name, isDurable, endpointProperties, doDurableProvisioning, testFlowCxn, "Durable queue"); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/DestinationType.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/DestinationType.java new file mode 100644 index 00000000..55ba3587 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/DestinationType.java @@ -0,0 +1,7 @@ +package com.solace.spring.cloud.stream.binder.util; + +public enum DestinationType { + + TOPIC, + QUEUE +} diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListenerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListenerTest.java new file mode 100644 index 00000000..544f5ac2 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/InboundXMLMessageListenerTest.java @@ -0,0 +1,45 @@ +package com.solace.spring.cloud.stream.binder.inbound; + +import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer; +import com.solace.spring.cloud.stream.binder.util.UnboundFlowReceiverContainerException; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.StaleSessionException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@ExtendWith(OutputCaptureExtension.class) +public class InboundXMLMessageListenerTest { + + @Mock + private ConsumerDestination consumerDestination; + + @Test + public void testListenerIsStoppedOnStaleSessionException(@Mock FlowReceiverContainer flowReceiverContainer, CapturedOutput output) + throws UnboundFlowReceiverContainerException, JCSMPException { + + when(flowReceiverContainer.receive()) + .thenThrow(new StaleSessionException("Session has become stale", new JCSMPException("Specific JCSMP exception"))); + + BasicInboundXMLMessageListener inboundXMLMessageListener = new BasicInboundXMLMessageListener( + flowReceiverContainer, consumerDestination, null, null, null, null, null, null, + null, null, false); + + inboundXMLMessageListener.run(); + + assertThat(output) + .contains("Session has lost connection") + .contains("Closing flow receiver to destination"); + + verify(flowReceiverContainer).unbind(); + } +} diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/acknowledge/JCSMPAcknowledgementCallbackIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/acknowledge/JCSMPAcknowledgementCallbackIT.java index 18a0b8f6..8f535e54 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/acknowledge/JCSMPAcknowledgementCallbackIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/acknowledge/JCSMPAcknowledgementCallbackIT.java @@ -71,7 +71,7 @@ initializers = ConfigDataApplicationContextInitializer.class) @ExtendWith(ExecutorServiceExtension.class) @ExtendWith(PubSubPlusExtension.class) -@Timeout(value = 1, unit = TimeUnit.MINUTES) +@Timeout(value = 2, unit = TimeUnit.MINUTES) public class JCSMPAcknowledgementCallbackIT { private RetryableTaskService retryableTaskService; private final AtomicReference flowReceiverContainerReference = new AtomicReference<>(); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java index 0eb8750f..491146d1 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java @@ -4,6 +4,7 @@ import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor; import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties; import com.solace.spring.cloud.stream.binder.util.CorrelationData; +import com.solace.spring.cloud.stream.binder.util.DestinationType; import com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey; import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager; import com.solace.spring.cloud.stream.binder.util.SolaceMessageHeaderErrorMessageStrategy; @@ -11,6 +12,8 @@ import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; import org.apache.commons.lang3.RandomStringUtils; @@ -24,6 +27,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.messaging.Message; @@ -31,15 +35,18 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.MessageBuilder; +import java.time.Instant; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; @Timeout(value = 10) @ExtendWith(MockitoExtension.class) @@ -48,6 +55,7 @@ public class JCSMPOutboundMessageHandlerTest { private JCSMPOutboundMessageHandler messageHandler; private JCSMPStreamingPublishCorrelatingEventHandler pubEventHandler; private ArgumentCaptor xmlMessageCaptor; + private ArgumentCaptor destinationCaptor; private ExtendedProducerProperties producerProperties; @Mock private XMLMessageProducer messageProducer; @Mock private SolaceMeterAccessor solaceMeterAccessor; @@ -57,6 +65,7 @@ public void init(@Mock JCSMPSession session, @Mock MessageChannel errChannel, @Mock SolaceMessageHeaderErrorMessageStrategy errorMessageStrategy) throws JCSMPException { xmlMessageCaptor = ArgumentCaptor.forClass(XMLMessage.class); + destinationCaptor = ArgumentCaptor.forClass(Destination.class); ArgumentCaptor pubEventHandlerCaptor = ArgumentCaptor .forClass(JCSMPStreamingPublishCorrelatingEventHandler.class); @@ -161,14 +170,110 @@ public void testMeter(boolean success) throws Exception { JCSMPException exception = new JCSMPException("Expected exception"); Mockito.doThrow(exception) .when(messageProducer) - .send(xmlMessageCaptor.capture(), Mockito.any(Destination.class)); + .send(xmlMessageCaptor.capture(), any(Destination.class)); assertThatThrownBy(() -> messageHandler.handleMessage(message)) .isInstanceOf(MessagingException.class) .hasCause(exception); } Mockito.verify(solaceMeterAccessor, Mockito.times(1)) - .recordMessage(Mockito.eq(producerProperties.getBindingName()), Mockito.any()); + .recordMessage(Mockito.eq(producerProperties.getBindingName()), any()); + } + + @Test + public void test_dynamic_destinationName_only() throws JCSMPException { + Message message = MessageBuilder.withPayload("the payload") + .setHeader(BinderHeaders.TARGET_DESTINATION, "dynamicDestinationName") + .setHeader("SOME_HEADER", "HOLA") //add extra header and confirm it is kept + .build(); + messageHandler.handleMessage(message); + + Mockito.verify(messageProducer).send(xmlMessageCaptor.capture(), destinationCaptor.capture()); + Destination targetDestination = destinationCaptor.getValue(); + assertThat(targetDestination).isInstanceOf(Topic.class); + assertThat(targetDestination.getName()).isEqualTo("dynamicDestinationName"); + + XMLMessage sentMessage = xmlMessageCaptor.getValue(); + assertThat(sentMessage.getProperties().get(BinderHeaders.TARGET_DESTINATION)).isNull(); + assertThat(sentMessage.getProperties().get("SOME_HEADER")).isEqualTo("HOLA"); + } + + @ParameterizedTest + @ValueSource(strings = { "topic", "queue", " TOPIc ", " QueUe ", "", " " }) + public void test_dynamic_destinationName_and_destinationType(String destinationType) throws JCSMPException { + Message message = getMessageForDynamicDestination("dynamicDestinationName", destinationType); + messageHandler.handleMessage(message); + + Mockito.verify(messageProducer).send(xmlMessageCaptor.capture(), destinationCaptor.capture()); + Destination targetDestination = destinationCaptor.getValue(); + + //MessageHandler uses default producerProperties so blank and unspecified destinationType defaults to Topic + assertThat(targetDestination).isInstanceOf(destinationType.trim().equalsIgnoreCase("queue") ? Queue.class : Topic.class); + assertThat(targetDestination.getName()).isEqualTo("dynamicDestinationName"); + + //Verify headers don't get set on ongoing Solace message + XMLMessage sentMessage = xmlMessageCaptor.getValue(); + assertThat(sentMessage.getProperties().get(BinderHeaders.TARGET_DESTINATION)).isNull(); + assertThat(sentMessage.getProperties().get(SolaceBinderHeaders.TARGET_DESTINATION_TYPE)).isNull(); + } + + @ParameterizedTest + @ValueSource(strings = { "queue", "topic" }) + public void test_dynamic_destinationName_with_destinationType_configured_on_messageHandler(String type, @Mock JCSMPSession session) throws JCSMPException { + SolaceProducerProperties producerProperties = new SolaceProducerProperties(); + producerProperties.setDestinationType(type.equals("queue") ? DestinationType.QUEUE : DestinationType.TOPIC); + ProducerDestination dest = Mockito.mock(ProducerDestination.class); + Mockito.when(dest.getName()).thenReturn("thisIsOverriddenByDynamicDestinationName"); + + Mockito.when(session.getMessageProducer(any())).thenReturn(messageProducer); + + messageHandler = new JCSMPOutboundMessageHandler( + dest, + session, + null, + new JCSMPSessionProducerManager(session), + new ExtendedProducerProperties<>(producerProperties), + solaceMeterAccessor + ); + messageHandler.start(); + + Message message = getMessageForDynamicDestination("dynamicDestinationName", null); + messageHandler.handleMessage(message); + + Mockito.verify(messageProducer).send(any(), destinationCaptor.capture()); + Destination targetDestination = destinationCaptor.getValue(); + assertThat(targetDestination).isInstanceOf(type.equals("queue") ? Queue.class : Topic.class); + assertThat(targetDestination.getName()).isEqualTo("dynamicDestinationName"); + } + + @Test + public void test_dynamic_destination_with_invalid_destinationType() { + Message message = getMessageForDynamicDestination("dynamicDestinationName", "INVALID"); + Exception exception = assertThrows(MessagingException.class, () -> messageHandler.handleMessage(message)); + assertThat(exception) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Incorrect value specified for header 'solace_scst_targetDestinationType'. Expected [ TOPIC|QUEUE ] but actual value is [ INVALID ]"); + } + + @Test + public void test_dynamic_destinationName_with_invalid_header_value_type() throws JCSMPException { + Message message = getMessageForDynamicDestination(Instant.now(), null); + Exception exception = assertThrows(MessagingException.class, () -> messageHandler.handleMessage(message)); + assertThat(exception) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Incorrect type specified for header 'scst_targetDestination'. Expected [class java.lang.String] but actual type is [class java.time.Instant]"); + } + + @Test + public void test_dynamic_destinationType_with_invalid_header_value_type() throws JCSMPException { + Message message = MessageBuilder.withPayload("the payload") + .setHeader(BinderHeaders.TARGET_DESTINATION, "someDynamicDestinationName") + .setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, Instant.now()) + .build(); + Exception exception = assertThrows(MessagingException.class, () -> messageHandler.handleMessage(message)); + assertThat(exception) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Incorrect type specified for header 'solace_scst_targetDestinationType'. Expected [class java.lang.String] but actual type is [class java.time.Instant]"); } Message getMessage(CorrelationData correlationData) { @@ -177,8 +282,19 @@ Message getMessage(CorrelationData correlationData) { .build(); } + private Message getMessageForDynamicDestination(Object targetDestination, Object targetDestinationType) { + MessageBuilder builder = MessageBuilder.withPayload("the payload"); + if (targetDestination != null) { + builder.setHeader(BinderHeaders.TARGET_DESTINATION, targetDestination); + } + if (targetDestinationType != null) { + builder.setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, targetDestinationType); + } + return builder.build(); + } + private ErrorChannelSendingCorrelationKey getCorrelationKey() throws JCSMPException { - Mockito.verify(messageProducer).send(xmlMessageCaptor.capture(), Mockito.any(Destination.class)); + Mockito.verify(messageProducer).send(xmlMessageCaptor.capture(), any(Destination.class)); return (ErrorChannelSendingCorrelationKey) xmlMessageCaptor.getValue().getCorrelationKey(); } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java index c49be852..67733356 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java @@ -10,7 +10,7 @@ public class RetryableAssertions { public static Duration RETRY_INTERVAL = Duration.ofMillis(500); public static void retryAssert(SoftAssertionsProvider.ThrowingRunnable assertRun) throws InterruptedException { - retryAssert(assertRun, 10, TimeUnit.SECONDS); + retryAssert(assertRun, 30, TimeUnit.SECONDS); } @SuppressWarnings("BusyWait") diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java index fca225d8..f4d55838 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java @@ -26,6 +26,7 @@ import com.solacesystems.jcsmp.XMLContentMessage; import com.solacesystems.jcsmp.XMLMessage; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.assertj.core.api.Assertions; @@ -60,7 +61,6 @@ import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.SerializationUtils; -import org.testcontainers.shaded.org.apache.commons.lang.math.RandomUtils; import java.lang.reflect.Array; import java.lang.reflect.Parameter; @@ -248,10 +248,10 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce case SolaceHeaders.SENDER_TIMESTAMP: case SolaceHeaders.SEQUENCE_NUMBER: case SolaceHeaders.TIME_TO_LIVE: - value = (long) RandomUtils.JVM_RANDOM.nextInt(10000); + value = (long) RandomUtils.nextInt(0, 10000); break; case SolaceHeaders.PRIORITY: - value = RandomUtils.JVM_RANDOM.nextInt(255); + value = RandomUtils.nextInt(0, 255); break; case SolaceHeaders.REPLY_TO: value = JCSMPFactory.onlyInstance().createQueue(RandomStringUtils.randomAlphanumeric(10)); @@ -382,6 +382,7 @@ public void testMapSpringMessageToXMLMessage_NonWriteableSolaceProperties() thro case SolaceBinderHeaders.BATCHED_HEADERS: case SolaceBinderHeaders.CONFIRM_CORRELATION: case SolaceBinderHeaders.NULL_PAYLOAD: + case SolaceBinderHeaders.TARGET_DESTINATION_TYPE: assertNull(xmlMessage.getProperties().get(header.getKey())); break; default: @@ -596,10 +597,10 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo case SolaceHeaders.SENDER_TIMESTAMP: case SolaceHeaders.SEQUENCE_NUMBER: case SolaceHeaders.TIME_TO_LIVE: - value = (long) RandomUtils.JVM_RANDOM.nextInt(10000); + value = (long) RandomUtils.nextInt(0, 10000); break; case SolaceHeaders.PRIORITY: - value = RandomUtils.JVM_RANDOM.nextInt(255); + value = RandomUtils.nextInt(0, 255); break; case SolaceHeaders.REPLY_TO: value = JCSMPFactory.onlyInstance().createQueue(RandomStringUtils.randomAlphanumeric(10)); @@ -1058,6 +1059,9 @@ public void testMapXMLMessageToSpringMessage_NonReadableSolaceProperties(boolean case SolaceBinderHeaders.CONFIRM_CORRELATION: metadata.putString(header.getKey(), "random_string"); break; + case SolaceBinderHeaders.TARGET_DESTINATION_TYPE: + metadata.putString(header.getKey(), "topic"); + break; default: fail(String.format("no test for header %s", header.getKey())); } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml index 61e8c3a4..b6014857 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml @@ -5,12 +5,12 @@ com.solace.spring.cloud solace-spring-cloud-parent - 2.4.3-SNAPSHOT + 2.5.0-SNAPSHOT ../../solace-spring-cloud-parent/pom.xml spring-cloud-stream-binder-solace - 3.4.3-SNAPSHOT + 3.5.0-SNAPSHOT jar Solace Spring Cloud Stream Binder diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/resources/META-INF/shared.beans b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/resources/META-INF/shared.beans new file mode 100644 index 00000000..a13d8ca4 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/resources/META-INF/shared.beans @@ -0,0 +1,2 @@ +com.solace.spring.cloud.stream.binder.meter.SolaceMessageMeterBinder +com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor \ No newline at end of file diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java index a8e2b29c..4149c946 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java @@ -9,6 +9,7 @@ import com.solace.spring.cloud.stream.binder.test.spring.SpringCloudStreamContext; import com.solace.spring.cloud.stream.binder.test.util.SimpleJCSMPEventHandler; import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder; +import com.solace.spring.cloud.stream.binder.util.DestinationType; import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension; import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension.ExecSvc; import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension; @@ -22,6 +23,7 @@ import com.solacesystems.jcsmp.ClosedFacilityException; import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.EndpointProperties; +import com.solacesystems.jcsmp.FlowReceiver; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPInterruptedException; import com.solacesystems.jcsmp.JCSMPProperties; @@ -1299,6 +1301,83 @@ public void testFailResumeOnClosedConsumer(Class channelType, consumerBinding.unbind(); } + @Test + public void testProducerDestinationTypeSetAsQueue(JCSMPSession jcsmpSession) throws Exception { + SolaceTestBinder binder = getBinder(); + + DirectChannel moduleOutputChannel = createBindableChannel("output", new BindingProperties()); + String destination = RandomStringUtils.randomAlphanumeric(15); + + SolaceProducerProperties solaceProducerProperties = new SolaceProducerProperties(); + solaceProducerProperties.setDestinationType(DestinationType.QUEUE); + + Binding producerBinding = null; + FlowReceiver flowReceiver = null; + try { + producerBinding = binder.bindProducer(destination, moduleOutputChannel, new ExtendedProducerProperties<>(solaceProducerProperties)); + + byte[] payload = RandomStringUtils.randomAlphanumeric(15).getBytes(); + Message message = MessageBuilder.withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE) + .build(); + + binderBindUnbindLatency(); + + logger.info(String.format("Sending message to message handler: %s", message)); + moduleOutputChannel.send(message); + + flowReceiver = jcsmpSession.createFlow(JCSMPFactory.onlyInstance().createQueue(destination), null, null); + flowReceiver.start(); + BytesXMLMessage solMsg = flowReceiver.receive(5000); + assertThat(solMsg) + .isNotNull() + .isInstanceOf(BytesMessage.class); + assertThat(((BytesMessage) solMsg).getData()).isEqualTo(payload); + } finally { + if (flowReceiver != null) flowReceiver.close(); + if (producerBinding != null) producerBinding.unbind(); + } + } + + @Test + public void testProducerDestinationTypeSetAsTopic(JCSMPSession jcsmpSession) throws Exception { + SolaceTestBinder binder = getBinder(); + + DirectChannel moduleOutputChannel = createBindableChannel("output", new BindingProperties()); + String destination = RandomStringUtils.randomAlphanumeric(15); + + SolaceProducerProperties solaceProducerProperties = new SolaceProducerProperties(); + solaceProducerProperties.setDestinationType(DestinationType.TOPIC); + + Binding producerBinding = null; + XMLMessageConsumer consumer = null; + try { + producerBinding = binder.bindProducer(destination, moduleOutputChannel, new ExtendedProducerProperties<>(solaceProducerProperties)); + + jcsmpSession.addSubscription(JCSMPFactory.onlyInstance().createTopic(destination)); + consumer = jcsmpSession.getMessageConsumer((XMLMessageListener) null); + consumer.start(); + + byte[] payload = RandomStringUtils.randomAlphanumeric(15).getBytes(); + Message message = MessageBuilder.withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE) + .build(); + + logger.info(String.format("Sending message to message handler: %s", message)); + moduleOutputChannel.send(message); + + BytesXMLMessage solMsg = consumer.receive(5000); + assertThat(solMsg) + .isNotNull() + .isInstanceOf(BytesMessage.class); + assertThat(((BytesMessage) solMsg).getData()).isEqualTo(payload); + } finally { + if (consumer != null) consumer.close(); + if (jcsmpSession != null) jcsmpSession.removeSubscription(JCSMPFactory.onlyInstance().createTopic(destination)); + if (producerBinding != null) producerBinding.unbind(); + } + } + private List getTxFlows(SempV2Api sempV2Api, String vpnName, String queueName, Integer count) throws ApiException { return sempV2Api.monitor() .getMsgVpnQueueTxFlows(vpnName, queueName, count, null, null, null) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderProvisioningLifecycleIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderProvisioningLifecycleIT.java index 42d9758f..d3cc3e2f 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderProvisioningLifecycleIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderProvisioningLifecycleIT.java @@ -10,9 +10,12 @@ import com.solace.spring.cloud.stream.binder.test.spring.SpringCloudStreamContext; import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder; import com.solace.spring.cloud.stream.binder.test.util.ThrowingFunction; +import com.solace.spring.cloud.stream.binder.util.DestinationType; import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension; import com.solace.test.integration.semp.v2.SempV2Api; import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnQueue; +import com.solace.test.integration.semp.v2.monitor.ApiException; +import com.solace.test.integration.semp.v2.monitor.model.MonitorMsgVpnQueue; import com.solace.test.integration.semp.v2.monitor.model.MonitorMsgVpnQueueMsg; import com.solace.test.integration.semp.v2.monitor.model.MonitorMsgVpnQueueTxFlow; import com.solace.test.integration.semp.v2.monitor.model.MonitorMsgVpnQueueTxFlowResponse; @@ -85,6 +88,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * All tests which modify the default provisioning lifecycle. @@ -2151,4 +2155,81 @@ public void testProducerNoAutoStartAndQueueNotExist( producerBinding.stop(); } -} + + @Test + public void testQueueProvisioningWithProducerDestinationTypeSetToQueue(JCSMPSession jcsmpSession, SpringCloudStreamContext context, SempV2Api sempV2Api, + SoftAssertions softly, TestInfo testInfo) throws Exception { + SolaceTestBinder binder = context.getBinder(); + String vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME); + + DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties()); + + String destination = RandomStringUtils.randomAlphanumeric(20); + + ExtendedProducerProperties producerProperties = context.createProducerProperties(testInfo); + producerProperties.getExtension().setDestinationType(DestinationType.QUEUE); + assertThat(producerProperties.getExtension().isProvisionDurableQueue()).isTrue(); + assertThat(producerProperties.getExtension().getQueueAccessType()).isEqualTo(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE); + + Queue queue = JCSMPFactory.onlyInstance().createQueue(destination); + Binding producerBinding = null; + + try { + producerBinding = binder.bindProducer(destination, moduleOutputChannel, producerProperties); + + MonitorMsgVpnQueue vpnQueue = sempV2Api.monitor().getMsgVpnQueue(vpnName, destination, null).getData(); + softly.assertThat(vpnQueue.getQueueName()).isEqualTo(destination); + softly.assertThat(vpnQueue.getAccessType()).isEqualTo(MonitorMsgVpnQueue.AccessTypeEnum.NON_EXCLUSIVE); + } finally { + if (producerBinding != null) producerBinding.unbind(); + jcsmpSession.deprovision(queue, JCSMPSession.FLAG_IGNORE_DOES_NOT_EXIST); + } + } + + @Test + public void testQueueProvisioningDisabledWithProducerDestinationTypeSetToQueue(JCSMPSession jcsmpSession, SpringCloudStreamContext context, SempV2Api sempV2Api, + TestInfo testInfo) throws Exception { + SolaceTestBinder binder = context.getBinder(); + String vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME); + + DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties()); + + String destination = RandomStringUtils.randomAlphanumeric(20); + + ExtendedProducerProperties producerProperties = context.createProducerProperties(testInfo); + producerProperties.getExtension().setDestinationType(DestinationType.QUEUE); + producerProperties.getExtension().setProvisionDurableQueue(false); + + Binding producerBinding = null; + try { + Exception provisioningException = assertThrows(ProvisioningException.class, () -> binder.bindProducer(destination, moduleOutputChannel, producerProperties)); + assertThat(provisioningException) + .hasMessageContaining( + String.format("Failed to connect test consumer flow to queue %s. Provisioning is disabled, queue was not provisioned nor was its configuration validated.", destination)); + ApiException apiException = assertThrows(ApiException.class, () -> sempV2Api.monitor().getMsgVpnQueue(vpnName, destination, null)); + assertThat(apiException.getCode()).isEqualTo(400); + } finally { + if (producerBinding != null) producerBinding.unbind(); + } + } + + @Test + public void testProducerDestinationTypeSetToQueueWithRequiredGroups(SpringCloudStreamContext context, TestInfo testInfo) throws Exception { + SolaceTestBinder binder = context.getBinder(); + DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties()); + String destination = RandomStringUtils.randomAlphanumeric(20); + + ExtendedProducerProperties producerProperties = context.createProducerProperties(testInfo); + producerProperties.setRequiredGroups("group-A"); + producerProperties.getExtension().setDestinationType(DestinationType.QUEUE); + + Binding producerBinding = null; + try { + Exception provisioningException = assertThrows(ProvisioningException.class, () -> binder.bindProducer(destination, moduleOutputChannel, producerProperties)); + assertThat(provisioningException) + .hasMessageContaining("Producer requiredGroups are not supported when destinationType=QUEUE"); + } finally { + if (producerBinding != null) producerBinding.unbind(); + } + } +} \ No newline at end of file diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/springBootTests/MultiBinderIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/springBootTests/MultiBinderIT.java new file mode 100644 index 00000000..29ad7329 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/springBootTests/MultiBinderIT.java @@ -0,0 +1,101 @@ +package com.solace.spring.cloud.stream.binder.springBootTests; + +import com.solace.test.integration.testcontainer.PubSubPlusContainer; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import com.solacesystems.jcsmp.XMLMessageProducer; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.web.servlet.MockMvc; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +/** + * Note: not using the PubSubPlusExtension as it appears to trigger after Spring Boot Application is started. + * Left as an exercise to convert to it if possible. This would prevent a second broker from being started. + */ +@SpringBootTest +@AutoConfigureMockMvc +@ActiveProfiles("multibinder") +@DirtiesContext //Ensures all listeners are stopped +public class MultiBinderIT { + private static final PubSubPlusContainer container = new PubSubPlusContainer(); + private static final JCSMPSession jcsmpSession; + private static final XMLMessageProducer producer; + private static final String QUEUE_NAME_PREFIX = "scst/wk/myConsumerGroup/plain/"; + private static final String QUEUE_NAME_1 = "MultiBinder/Queue/1"; + static { + container.start(); + JCSMPProperties props = new JCSMPProperties(container.getHost(), + container.getMappedPort(PubSubPlusContainer.Port.SMF.getInternalPort()), + container.getAdminUsername(), container.getAdminPassword(), null); + try { + jcsmpSession = JCSMPFactory.onlyInstance().createSession(props); + jcsmpSession.connect(); + producer = jcsmpSession.getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() { + @Override + public void responseReceivedEx(Object o) { } + + @Override + public void handleErrorEx(Object o, JCSMPException e, long l) { } + }); + } catch (Exception e) { + throw new RuntimeException("Failed setup", e); + } + } + + @AfterAll + public static void close() { + if (jcsmpSession != null) { + jcsmpSession.closeSession(); + } + } + + @DynamicPropertySource + static void registerPgProperties(DynamicPropertyRegistry registry) { + registry.add("spring.cloud.stream.binders.solace1.environment.solace.java.host", + () -> String.format("tcp://%s:%d", container.getHost(), container.getMappedPort(55555))); + registry.add("spring.cloud.stream.binders.solace2.environment.solace.java.host", + () -> String.format("tcp://%s:%d", container.getHost(), container.getMappedPort(55555))); + } + + @Test + public void checkHealthOfMultipleSolaceBinders(@Autowired MockMvc mvc) throws Exception { + mvc.perform(get("/actuator/health")) + .andExpectAll( + status().isOk(), + jsonPath("components.binders.components.solace1").exists(), + jsonPath("components.binders.components.solace1.status").value("UP"), + jsonPath("components.binders.components.solace2").exists(), + jsonPath("components.binders.components.solace2.status").value("UP") + ); + } + + @Test + public void checkSolaceMetricsAreExposed(@Autowired MockMvc mvc) throws Exception { + //Send a message to activate metrics + producer.send(JCSMPFactory.onlyInstance().createBytesXMLMessage(), + JCSMPFactory.onlyInstance().createQueue(QUEUE_NAME_PREFIX + QUEUE_NAME_1)); + + mvc.perform(get("/actuator/metrics")) + .andExpectAll( + jsonPath("names", Matchers.hasItem("solace.message.size.payload")), + jsonPath("names", Matchers.hasItem("solace.message.size.total")) + ); + } + +} diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/springBootTests/SpringCloudStreamApp.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/springBootTests/SpringCloudStreamApp.java new file mode 100644 index 00000000..673e0773 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/springBootTests/SpringCloudStreamApp.java @@ -0,0 +1,27 @@ +package com.solace.spring.cloud.stream.binder.springBootTests; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; + +import java.util.function.Consumer; + +@SpringBootApplication +public class SpringCloudStreamApp { + + public static void main(String[] args) { + SpringApplication.run(SpringCloudStreamApp.class, args); + } + + @Bean + public Consumer> consume() { + return (msg -> System.out.println(msg.getPayload())); + } + + @Bean + public Consumer> otherConsume() { + return (msg -> System.out.println(msg.getPayload())); + } + +} diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java index 28753ac6..c0bd282c 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/RetryableAssertions.java @@ -7,7 +7,7 @@ public class RetryableAssertions { public static void retryAssert(SoftAssertionsProvider.ThrowingRunnable assertRun) throws InterruptedException { - retryAssert(10, TimeUnit.SECONDS, assertRun); + retryAssert(30, TimeUnit.SECONDS, assertRun); } @SuppressWarnings("BusyWait") diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/resources/application-multibinder.yml b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/resources/application-multibinder.yml new file mode 100644 index 00000000..5ca8176c --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/resources/application-multibinder.yml @@ -0,0 +1,40 @@ +spring: + cloud: + function: + definition: "consume;otherConsume" + stream: + binders: + solace1: + type: solace + environment: + solace: + java: + host: placeholder + solace2: + type: solace + environment: + solace: + java: + host: placeholder + bindings: + consume-in-0: + destination: MultiBinder/Queue/1 + group: myConsumerGroup + binder: solace1 + otherConsume-in-0: + destination: MultiBinder/Queue/2 + group: myConsumerGroup + binder: solace2 + default-binder: solace1 +management: + endpoint: + health: + show-components: always + show-details: always + endpoints: + web: # Actuator web endpoint configuration. For more info: https://docs.spring.io/spring-boot/docs/current/reference/html/actuator.html#actuator.endpoints + exposure: + include: 'health,metrics' + health: + binders: + enabled: true