Skip to content

Commit

Permalink
tests: additional RabbitListener tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal Dal Farra committed Jun 5, 2024
1 parent 65fed2d commit 8c0950e
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.examples.amqp;

public class AmqpConstants {
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

public static final String EXAMPLE_TOPIC_EXCHANGE = "example-topic-exchange";
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AmqpConstants {

public static final String EXAMPLE_TOPIC_ROUTING_KEY = "example-topic-routing-key";
// Exchanges

public static final String EXAMPLE_QUEUE = "example-queue";
public static final String ANOTHER_QUEUE = "another-queue";
public static final String MULTI_PAYLOAD_QUEUE = "multi-payload-queue";
public static final String EXCHANGE_EXAMPLE_TOPIC_EXCHANGE = "example-topic-exchange";
public static final String EXCHANGE_CRUD_TOPIC_EXCHANGE_1 = "CRUD-topic-exchange-1";
public static final String EXCHANGE_CRUD_TOPIC_EXCHANGE_2 = "CRUD-topic-exchange-2";
/**
* Direct Exchange - Routing is based on 'Routing key'.
* Messages are 'routed' towards queue which name is equals to 'routing key' <BR/>
* Note:<BR/>
* The 'Default exchange' is a 'Direct exchange' with an empty name ( name= "")
*/
public static final String EXCHANGE_DEFAULT_EXCHANGE = "";

public static final String EXAMPLE_BINDINGS_QUEUE = "example-bindings-queue";
// Routing keys

/**
* When a queue is bound with "#" (hash) binding key,
* it will receive all the messages, regardless of the routing key - like in fanout exchange.
*/
public static final String ROUTING_KEY_ALL_MESSAGES = "#";

public static final String ROUTING_KEY_CRUD = "crud-routing-key";
public static final String ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY = "example-topic-routing-key";

// Queues

public static final String QUEUE_EXAMPLE_QUEUE = "example-queue";
public static final String QUEUE_ANOTHER_QUEUE = "another-queue";
public static final String QUEUE_MULTI_PAYLOAD_QUEUE = "multi-payload-queue";
public static final String QUEUE_EXAMPLE_BINDINGS_QUEUE = "example-bindings-queue";

public static final String QUEUE_CREATE = "queue-create";
public static final String QUEUE_READ = "queue-read";
public static final String QUEUE_DELETE = "queue-delete";
public static final String QUEUE_UPDATE = "queue-update";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,39 @@ public Jackson2JsonMessageConverter converter() {

@Bean
public Queue exampleQueue() {
return new Queue(AmqpConstants.EXAMPLE_QUEUE, false);
return new Queue(AmqpConstants.QUEUE_EXAMPLE_QUEUE, false);
}

@Bean
public Queue anotherQueue() {
return new Queue(AmqpConstants.ANOTHER_QUEUE, false);
return new Queue(AmqpConstants.QUEUE_ANOTHER_QUEUE, false);
}

@Bean
public Queue exampleBindingsQueue() {
return new Queue(AmqpConstants.EXAMPLE_BINDINGS_QUEUE, false, true, true);
return new Queue(AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE, false, true, true);
}

@Bean
public Queue queueRead() {
return new Queue(AmqpConstants.QUEUE_READ, false);
}

@Bean
public Exchange exampleTopicExchange() {
return new TopicExchange(AmqpConstants.EXAMPLE_TOPIC_EXCHANGE);
return new TopicExchange(AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE);
}

@Bean
public Queue multiPayloadQueue() {
return new Queue(AmqpConstants.MULTI_PAYLOAD_QUEUE);
return new Queue(AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE);
}

@Bean
public Binding exampleTopicBinding(Queue exampleBindingsQueue, Exchange exampleTopicExchange) {
return BindingBuilder.bind(exampleBindingsQueue)
.to(exampleTopicExchange)
.with(AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY)
.with(AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY)
.noargs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import io.github.springwolf.examples.amqp.AmqpConstants;
import io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto;
import io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto;
import io.github.springwolf.examples.amqp.dtos.GenericPayloadDto;
import io.github.springwolf.examples.amqp.producers.AnotherProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -21,9 +24,9 @@ public class ExampleConsumer {

private final AnotherProducer anotherProducer;

@RabbitListener(queues = AmqpConstants.EXAMPLE_QUEUE)
@RabbitListener(queues = AmqpConstants.QUEUE_EXAMPLE_QUEUE)
public void receiveExamplePayload(ExamplePayloadDto payload) {
log.info("Received new message in {}: {}", AmqpConstants.EXAMPLE_QUEUE, payload.toString());
log.info("Received new message in {}: {}", AmqpConstants.QUEUE_EXAMPLE_QUEUE, payload.toString());

AnotherPayloadDto example = new AnotherPayloadDto();
example.setExample(payload);
Expand All @@ -32,45 +35,106 @@ public void receiveExamplePayload(ExamplePayloadDto payload) {
anotherProducer.sendMessage(example);
}

@RabbitListener(queues = AmqpConstants.ANOTHER_QUEUE)
@RabbitListener(queues = AmqpConstants.QUEUE_ANOTHER_QUEUE)
public void receiveAnotherPayload(AnotherPayloadDto payload) {
log.info("Received new message in {}: {}", AmqpConstants.ANOTHER_QUEUE, payload.toString());
log.info("Received new message in {}: {}", AmqpConstants.QUEUE_ANOTHER_QUEUE, payload.toString());
}

@RabbitListener(
bindings = {
@QueueBinding(
exchange = @Exchange(name = AmqpConstants.EXAMPLE_TOPIC_EXCHANGE, type = ExchangeTypes.TOPIC),
exchange =
@Exchange(
name = AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE,
type = ExchangeTypes.TOPIC),
value =
@Queue(
name = AmqpConstants.EXAMPLE_BINDINGS_QUEUE,
name = AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE,
durable = "false",
exclusive = "true",
autoDelete = "true"),
key = AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY)
key = AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY)
})
public void bindingsExample(AnotherPayloadDto payload) {
log.info(
"Received new message in {}" + " through exchange {}" + " using routing key {}: {}",
AmqpConstants.EXAMPLE_BINDINGS_QUEUE,
AmqpConstants.EXAMPLE_TOPIC_EXCHANGE,
AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY,
AmqpConstants.QUEUE_EXAMPLE_BINDINGS_QUEUE,
AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE,
AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY,
payload.toString());
}

@RabbitListener(queues = AmqpConstants.MULTI_PAYLOAD_QUEUE)
@RabbitListener(queues = AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE)
public void bindingsBeanExample(AnotherPayloadDto payload) {
log.info(
"Received new message in {} (AnotherPayloadDto): {}",
AmqpConstants.MULTI_PAYLOAD_QUEUE,
AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE,
payload.toString());
}

@RabbitListener(queues = AmqpConstants.MULTI_PAYLOAD_QUEUE)
@RabbitListener(queues = AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE)
public void bindingsBeanExample(ExamplePayloadDto payload) {
log.info(
"Received new message in {} (ExamplePayloadDto): {}",
AmqpConstants.MULTI_PAYLOAD_QUEUE,
AmqpConstants.QUEUE_MULTI_PAYLOAD_QUEUE,
payload.toString());
}

@RabbitListener(
autoStartup = "false",
queuesToDeclare = @Queue(name = AmqpConstants.QUEUE_CREATE, autoDelete = "false", durable = "true"))
public void queuesToDeclareCreate(Message message, @Payload GenericPayloadDto<String> payload) {
log.info(
"Received new message {} in {} (GenericPayloadDto<String>): {}",
message,
AmqpConstants.QUEUE_CREATE,
payload.toString());
}

@RabbitListener(
autoStartup = "false",
queuesToDeclare = @Queue(name = AmqpConstants.QUEUE_DELETE, autoDelete = "false", durable = "true"))
public void queuesToDeclareDelete(Message message, @Payload GenericPayloadDto<Long> payload) {
log.info(
"Received new message {} in {} (GenericPayloadDto<Long>): {}",
message,
AmqpConstants.QUEUE_DELETE,
payload.toString());
}

@RabbitListener(
autoStartup = "false",
bindings =
@QueueBinding(
exchange =
@Exchange(
name = AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_1,
type = ExchangeTypes.TOPIC),
key = AmqpConstants.ROUTING_KEY_ALL_MESSAGES,
value = @Queue(name = AmqpConstants.QUEUE_UPDATE, durable = "true", autoDelete = "false")))
public void bindingsUpdate(Message message, @Payload GenericPayloadDto<ExamplePayloadDto> payload) {
log.info(
"Received new message {} in {} (GenericPayloadDto<ExamplePayloadDto>): {}",
message,
AmqpConstants.QUEUE_UPDATE,
payload.toString());
}

@RabbitListener(
autoStartup = "false",
bindings =
@QueueBinding(
exchange =
@Exchange(
name = AmqpConstants.EXCHANGE_CRUD_TOPIC_EXCHANGE_2,
type = ExchangeTypes.TOPIC),
key = AmqpConstants.ROUTING_KEY_ALL_MESSAGES,
value = @Queue(name = AmqpConstants.QUEUE_READ, durable = "true", autoDelete = "false")))
public void bindingsRead(Message message, @Payload ExamplePayloadDto payload) {
log.info(
"Received new message {} in {} (ExamplePayloadDto): {}",
message,
AmqpConstants.QUEUE_UPDATE,
payload.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.examples.amqp.dtos;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import static io.swagger.v3.oas.annotations.media.Schema.RequiredMode.REQUIRED;

@Schema(description = "Generic payload model")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GenericPayloadDto<T> {

@Schema(description = "Generic Payload field", requiredMode = REQUIRED)
private T genericValue;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ public class AnotherProducer {
@AsyncPublisher(
operation =
@AsyncOperation(
channelName = AmqpConstants.EXAMPLE_TOPIC_EXCHANGE,
channelName = AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE,
description = "Custom, optional description defined in the AsyncPublisher annotation"))
@AmqpAsyncOperationBinding()
public void sendMessage(AnotherPayloadDto msg) {
// send
rabbitTemplate.convertAndSend(
AmqpConstants.EXAMPLE_TOPIC_EXCHANGE, AmqpConstants.EXAMPLE_TOPIC_ROUTING_KEY, msg);
AmqpConstants.EXCHANGE_EXAMPLE_TOPIC_EXCHANGE,
AmqpConstants.ROUTING_KEY_EXAMPLE_TOPIC_ROUTING_KEY,
msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void producerCanUseSpringwolfConfigurationToSendMessage() {
payload.setSomeEnum(FOO1);

// when
springwolfAmqpProducer.send(AmqpConstants.EXAMPLE_QUEUE, payload);
springwolfAmqpProducer.send(AmqpConstants.QUEUE_EXAMPLE_QUEUE, payload);

// then
verify(exampleConsumer, timeout(10000)).receiveExamplePayload(payload);
Expand Down
Loading

0 comments on commit 8c0950e

Please sign in to comment.