From 8c0950e89b30e232ea7b54f9832d347ebed20edd Mon Sep 17 00:00:00 2001 From: Pascal Dal Farra Date: Wed, 5 Jun 2024 12:58:55 +0200 Subject: [PATCH] tests: additional RabbitListener tests --- .../examples/amqp/AmqpConstants.java | 44 +++++- .../configuration/RabbitConfiguration.java | 17 ++- .../amqp/consumers/ExampleConsumer.java | 92 ++++++++++-- .../examples/amqp/dtos/GenericPayloadDto.java | 19 +++ .../amqp/producers/AnotherProducer.java | 6 +- .../examples/amqp/ProducerSystemTest.java | 2 +- .../src/test/resources/asyncapi.yaml | 141 ++++++++++++++++++ 7 files changed, 291 insertions(+), 30 deletions(-) create mode 100644 springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/dtos/GenericPayloadDto.java diff --git a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/AmqpConstants.java b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/AmqpConstants.java index aa1012f48..4701d2921 100644 --- a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/AmqpConstants.java +++ b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/AmqpConstants.java @@ -1,15 +1,45 @@ // SPDX-License-Identifier: Apache-2.0 package io.github.springwolf.examples.amqp; -public class AmqpConstants { +import lombok.AccessLevel; +import lombok.NoArgsConstructor; - public static final String EXAMPLE_TOPIC_EXCHANGE = "example-topic-exchange"; +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class AmqpConstants { - public static final String EXAMPLE_TOPIC_ROUTING_KEY = "example-topic-routing-key"; + // Exchanges - public static final String EXAMPLE_QUEUE = "example-queue"; - public static final String ANOTHER_QUEUE = "another-queue"; - public static final String MULTI_PAYLOAD_QUEUE = "multi-payload-queue"; + public static final String EXCHANGE_EXAMPLE_TOPIC_EXCHANGE = "example-topic-exchange"; + public static final String EXCHANGE_CRUD_TOPIC_EXCHANGE_1 = "CRUD-topic-exchange-1"; + public static final String EXCHANGE_CRUD_TOPIC_EXCHANGE_2 = "CRUD-topic-exchange-2"; + /** + * Direct Exchange - Routing is based on 'Routing key'. + * Messages are 'routed' towards queue which name is equals to 'routing key'
+ * Note:
+ * The 'Default exchange' is a 'Direct exchange' with an empty name ( name= "") + */ + public static final String EXCHANGE_DEFAULT_EXCHANGE = ""; - public static final String EXAMPLE_BINDINGS_QUEUE = "example-bindings-queue"; + // Routing keys + + /** + * When a queue is bound with "#" (hash) binding key, + * it will receive all the messages, regardless of the routing key - like in fanout exchange. + */ + public static final String ROUTING_KEY_ALL_MESSAGES = "#"; + + public static final String ROUTING_KEY_CRUD = "crud-routing-key"; + public static final String ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY = "example-topic-routing-key"; + + // Queues + + public static final String QUEUE_EXAMPLE_QUEUE = "example-queue"; + public static final String QUEUE_ANOTHER_QUEUE = "another-queue"; + public static final String QUEUE_MULTI_PAYLOAD_QUEUE = "multi-payload-queue"; + public static final String QUEUE_EXAMPLE_BINDINGS_QUEUE = "example-bindings-queue"; + + public static final String QUEUE_CREATE = "queue-create"; + public static final String QUEUE_READ = "queue-read"; + public static final String QUEUE_DELETE = "queue-delete"; + public static final String QUEUE_UPDATE = "queue-update"; } diff --git a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/configuration/RabbitConfiguration.java b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/configuration/RabbitConfiguration.java index 1f8f2d4ad..7f775adbd 100644 --- a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/configuration/RabbitConfiguration.java +++ b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/configuration/RabbitConfiguration.java @@ -24,34 +24,39 @@ public Jackson2JsonMessageConverter converter() { @Bean public Queue exampleQueue() { - return new Queue(AmqpConstants.EXAMPLE_QUEUE, false); + return new Queue(AmqpConstants.QUEUE_EXAMPLE_QUEUE, false); } @Bean public Queue anotherQueue() { - return new Queue(AmqpConstants.ANOTHER_QUEUE, false); + return new Queue(AmqpConstants.QUEUE_ANOTHER_QUEUE, false); } @Bean public Queue exampleBindingsQueue() { - return new Queue(AmqpConstants.EXAMPLE_BINDINGS_QUEUE, false, true, true); + return new Queue(AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, false, true, true); + } + + @Bean + public Queue queueRead() { + return new Queue(AmqpConstants.QUEUE_READ, false); } @Bean public Exchange exampleTopicExchange() { - return new TopicExchange(AmqpConstants.EXAMPLE_TOPIC_EXCHANGE); + return new TopicExchange(AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE); } @Bean public Queue multiPayloadQueue() { - return new Queue(AmqpConstants.MULTI_PAYLOAD_QUEUE); + return new Queue(AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE); } @Bean public Binding exampleTopicBinding(Queue exampleBindingsQueue, Exchange exampleTopicExchange) { return BindingBuilder.bind(exampleBindingsQueue) .to(exampleTopicExchange) - .with(AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY) + .with(AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY) .noargs(); } } diff --git a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/consumers/ExampleConsumer.java b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/consumers/ExampleConsumer.java index 8447c4af0..e0af773a4 100644 --- a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/consumers/ExampleConsumer.java +++ b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/consumers/ExampleConsumer.java @@ -4,14 +4,17 @@ import io.github.springwolf.examples.amqp.AmqpConstants; import io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto; import io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto; +import io.github.springwolf.examples.amqp.dtos.GenericPayloadDto; import io.github.springwolf.examples.amqp.producers.AnotherProducer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component @@ -21,9 +24,9 @@ public class ExampleConsumer { private final AnotherProducer anotherProducer; - @RabbitListener(queues = AmqpConstants.EXAMPLE_QUEUE) + @RabbitListener(queues = AmqpConstants.QUEUE_EXAMPLE_QUEUE) public void receiveExamplePayload(ExamplePayloadDto payload) { - log.info("Received new message in {}: {}", AmqpConstants.EXAMPLE_QUEUE, payload.toString()); + log.info("Received new message in {}: {}", AmqpConstants.QUEUE_EXAMPLE_QUEUE, payload.toString()); AnotherPayloadDto example = new AnotherPayloadDto(); example.setExample(payload); @@ -32,45 +35,106 @@ public void receiveExamplePayload(ExamplePayloadDto payload) { anotherProducer.sendMessage(example); } - @RabbitListener(queues = AmqpConstants.ANOTHER_QUEUE) + @RabbitListener(queues = AmqpConstants.QUEUE_ANOTHER_QUEUE) public void receiveAnotherPayload(AnotherPayloadDto payload) { - log.info("Received new message in {}: {}", AmqpConstants.ANOTHER_QUEUE, payload.toString()); + log.info("Received new message in {}: {}", AmqpConstants.QUEUE_ANOTHER_QUEUE, payload.toString()); } @RabbitListener( bindings = { @QueueBinding( - exchange = @Exchange(name = AmqpConstants.EXAMPLE_TOPIC_EXCHANGE, type = ExchangeTypes.TOPIC), + exchange = + @Exchange( + name = AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE, + type = ExchangeTypes.TOPIC), value = @Queue( - name = AmqpConstants.EXAMPLE_BINDINGS_QUEUE, + name = AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, durable = "false", exclusive = "true", autoDelete = "true"), - key = AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY) + key = AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY) }) public void bindingsExample(AnotherPayloadDto payload) { log.info( "Received new message in {}" + " through exchange {}" + " using routing key {}: {}", - AmqpConstants.EXAMPLE_BINDINGS_QUEUE, - AmqpConstants.EXAMPLE_TOPIC_EXCHANGE, - AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY, + AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, + AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE, + AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY, payload.toString()); } - @RabbitListener(queues = AmqpConstants.MULTI_PAYLOAD_QUEUE) + @RabbitListener(queues = AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE) public void bindingsBeanExample(AnotherPayloadDto payload) { log.info( "Received new message in {} (AnotherPayloadDto): {}", - AmqpConstants.MULTI_PAYLOAD_QUEUE, + AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE, payload.toString()); } - @RabbitListener(queues = AmqpConstants.MULTI_PAYLOAD_QUEUE) + @RabbitListener(queues = AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE) public void bindingsBeanExample(ExamplePayloadDto payload) { log.info( "Received new message in {} (ExamplePayloadDto): {}", - AmqpConstants.MULTI_PAYLOAD_QUEUE, + AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE, + payload.toString()); + } + + @RabbitListener( + autoStartup = "false", + queuesToDeclare = @Queue(name = AmqpConstants.QUEUE_CREATE, autoDelete = "false", durable = "true")) + public void queuesToDeclareCreate(Message message, @Payload GenericPayloadDto payload) { + log.info( + "Received new message {} in {} (GenericPayloadDto): {}", + message, + AmqpConstants.QUEUE_CREATE, + payload.toString()); + } + + @RabbitListener( + autoStartup = "false", + queuesToDeclare = @Queue(name = AmqpConstants.QUEUE_DELETE, autoDelete = "false", durable = "true")) + public void queuesToDeclareDelete(Message message, @Payload GenericPayloadDto payload) { + log.info( + "Received new message {} in {} (GenericPayloadDto): {}", + message, + AmqpConstants.QUEUE_DELETE, + payload.toString()); + } + + @RabbitListener( + autoStartup = "false", + bindings = + @QueueBinding( + exchange = + @Exchange( + name = AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_1, + type = ExchangeTypes.TOPIC), + key = AmqpConstants.ROUTING_KEY_ALL_MESSAGES, + value = @Queue(name = AmqpConstants.QUEUE_UPDATE, durable = "true", autoDelete = "false"))) + public void bindingsUpdate(Message message, @Payload GenericPayloadDto payload) { + log.info( + "Received new message {} in {} (GenericPayloadDto): {}", + message, + AmqpConstants.QUEUE_UPDATE, + payload.toString()); + } + + @RabbitListener( + autoStartup = "false", + bindings = + @QueueBinding( + exchange = + @Exchange( + name = AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_2, + type = ExchangeTypes.TOPIC), + key = AmqpConstants.ROUTING_KEY_ALL_MESSAGES, + value = @Queue(name = AmqpConstants.QUEUE_READ, durable = "true", autoDelete = "false"))) + public void bindingsRead(Message message, @Payload ExamplePayloadDto payload) { + log.info( + "Received new message {} in {} (ExamplePayloadDto): {}", + message, + AmqpConstants.QUEUE_UPDATE, payload.toString()); } } diff --git a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/dtos/GenericPayloadDto.java b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/dtos/GenericPayloadDto.java new file mode 100644 index 000000000..c376517a9 --- /dev/null +++ b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/dtos/GenericPayloadDto.java @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.springwolf.examples.amqp.dtos; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import static io.swagger.v3.oas.annotations.media.Schema.RequiredMode.REQUIRED; + +@Schema(description = "Generic payload model") +@Data +@AllArgsConstructor +@NoArgsConstructor +public class GenericPayloadDto { + + @Schema(description = "Generic Payload field", requiredMode = REQUIRED) + private T genericValue; +} diff --git a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/producers/AnotherProducer.java b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/producers/AnotherProducer.java index 70e2081a8..016a903e5 100644 --- a/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/producers/AnotherProducer.java +++ b/springwolf-examples/springwolf-amqp-example/src/main/java/io/github/springwolf/examples/amqp/producers/AnotherProducer.java @@ -18,12 +18,14 @@ public class AnotherProducer { @AsyncPublisher( operation = @AsyncOperation( - channelName = AmqpConstants.EXAMPLE_TOPIC_EXCHANGE, + channelName = AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE, description = "Custom, optional description defined in the AsyncPublisher annotation")) @AmqpAsyncOperationBinding() public void sendMessage(AnotherPayloadDto msg) { // send rabbitTemplate.convertAndSend( - AmqpConstants.EXAMPLE_TOPIC_EXCHANGE, AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY, msg); + AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE, + AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY, + msg); } } diff --git a/springwolf-examples/springwolf-amqp-example/src/test/java/io/github/springwolf/examples/amqp/ProducerSystemTest.java b/springwolf-examples/springwolf-amqp-example/src/test/java/io/github/springwolf/examples/amqp/ProducerSystemTest.java index 89ff0e5dc..884e5f54f 100644 --- a/springwolf-examples/springwolf-amqp-example/src/test/java/io/github/springwolf/examples/amqp/ProducerSystemTest.java +++ b/springwolf-examples/springwolf-amqp-example/src/test/java/io/github/springwolf/examples/amqp/ProducerSystemTest.java @@ -75,7 +75,7 @@ void producerCanUseSpringwolfConfigurationToSendMessage() { payload.setSomeEnum(FOO1); // when - springwolfAmqpProducer.send(AmqpConstants.EXAMPLE_QUEUE, payload); + springwolfAmqpProducer.send(AmqpConstants.QUEUE_EXAMPLE_QUEUE, payload); // then verify(exampleConsumer, timeout(10000)).receiveExamplePayload(payload); diff --git a/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml b/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml index 58ce4717f..3448e0941 100644 --- a/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml +++ b/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml @@ -19,6 +19,34 @@ servers: host: amqp:5672 protocol: amqp channels: + 'CRUD-topic-exchange-1_#': + messages: + io.github.springwolf.examples.amqp.dtos.GenericPayloadDto: + $ref: '#/components/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' + bindings: + amqp: + is: routingKey + exchange: + name: CRUD-topic-exchange-1 + type: topic + durable: true + autoDelete: false + vhost: / + bindingVersion: 0.3.0 + 'CRUD-topic-exchange-2_#': + messages: + io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto: + $ref: '#/components/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto' + bindings: + amqp: + is: routingKey + exchange: + name: CRUD-topic-exchange-2 + type: topic + durable: true + autoDelete: false + vhost: / + bindingVersion: 0.3.0 another-queue: messages: io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto: @@ -81,6 +109,48 @@ channels: autoDelete: false vhost: / bindingVersion: 0.3.0 + queue-create: + messages: + io.github.springwolf.examples.amqp.dtos.GenericPayloadDto: + $ref: '#/components/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' + bindings: + amqp: + is: queue + queue: + name: queue-create + durable: true + exclusive: false + autoDelete: false + vhost: / + bindingVersion: 0.3.0 + queue-update: + messages: + io.github.springwolf.examples.amqp.dtos.GenericPayloadDto: + $ref: '#/components/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' + bindings: + amqp: + is: queue + queue: + name: queue-update + durable: true + exclusive: false + autoDelete: false + vhost: / + bindingVersion: 0.3.0 + queue-delete: + messages: + io.github.springwolf.examples.amqp.dtos.GenericPayloadDto: + $ref: '#/components/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' + bindings: + amqp: + is: queue + queue: + name: queue-delete + durable: true + exclusive: false + autoDelete: false + vhost: / + bindingVersion: 0.3.0 components: schemas: HeadersNotDocumented: @@ -144,6 +214,17 @@ components: required: - someEnum - someString + io.github.springwolf.examples.amqp.dtos.GenericPayloadDto: + type: object + properties: + genericValue: + type: object + description: Generic Payload field + description: Generic payload model + examples: + - genericValue: {} + required: + - genericValue messages: io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto: headers: @@ -170,7 +251,43 @@ components: bindings: amqp: bindingVersion: 0.3.0 + io.github.springwolf.examples.amqp.dtos.GenericPayloadDto: + headers: + $ref: '#/components/schemas/SpringRabbitListenerDefaultHeaders' + payload: + schemaFormat: application/vnd.aai.asyncapi+json;version=3.0.0 + schema: + $ref: '#/components/schemas/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' + name: io.github.springwolf.examples.amqp.dtos.GenericPayloadDto + title: GenericPayloadDto + bindings: + amqp: + bindingVersion: 0.3.0 operations: + '#_receive_bindingsRead': + action: receive + channel: + $ref: '#/channels/CRUD-topic-exchange-2_#' + bindings: + amqp: + expiration: 0 + cc: + - '#' + bindingVersion: 0.3.0 + messages: + - $ref: '#/channels/#/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto' + '#_receive_bindingsUpdate': + action: receive + channel: + $ref: '#/channels/CRUD-topic-exchange-1_#' + bindings: + amqp: + expiration: 0 + cc: + - '#' + bindingVersion: 0.3.0 + messages: + - $ref: '#/channels/#/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' another-queue_receive_receiveAnotherPayload: action: receive channel: @@ -238,3 +355,27 @@ operations: messages: - $ref: '#/channels/multi-payload-queue/messages/io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto' - $ref: '#/channels/multi-payload-queue/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto' + queue-create_receive_queuesToDeclareCreate: + action: receive + channel: + $ref: '#/channels/queue-create' + bindings: + amqp: + expiration: 0 + cc: + - queue-create + bindingVersion: 0.3.0 + messages: + - $ref: '#/channels/queue-create/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' + queue-delete_receive_queuesToDeclareDelete: + action: receive + channel: + $ref: '#/channels/queue-delete' + bindings: + amqp: + expiration: 0 + cc: + - queue-delete + bindingVersion: 0.3.0 + messages: + - $ref: '#/channels/queue-delete/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDto' \ No newline at end of file