-
Notifications
You must be signed in to change notification settings - Fork 2
integration rabbitmq topic
A topic message exchange is based on a string value consisting of a pattern used to determine the associated queues to which the message should be delivered. Domain-events are published to topic exchanges and are often used for microservice integration. The values within the RouteKey are often based on property values of the associated domain-event.
The publisher defines the exchange to which a given domain-event will be published. However, the publisher does not know all the consumers of the exchange. The consumers, knowing the exchange name, define queues to which events will be received matching the specified route-key patterns. If a consumer is not executing, the message will be queued and delivered when the subscriber is started.
The message publisher specifies a RouteKey containing a period delimited value based on an agreed pattern between the publisher and subscriber.
For example, the following could be a RouteKey for a domain-event used to update other microservices that an automobile has been sold: VW.AllTrack.2017
For the above example route-key, the following are valid matching patterns:
- VW.*.2017 (all 2017 VW models)
- Volvo.S60.* (for all years of the same make and model)
Add the following domain-event class to the Examples.RabbitMQ.Domain/Events directory:
using NetFusion.Messaging.Types;
namespace Examples.RabbitMQ.Domain.Events;
public class AutoSaleCompleted : DomainEvent
{
public string Make { get; }
public string Model { get; }
public int Year { get; }
public string? Color { get; init; }
public bool? IsNew { get; init; }
public AutoSaleCompleted(string make, string model, int year)
{
Make = make;
Model = model;
Year = year;
}
}
Add the following code to the RabbitMqBusRouter OnDefineEntities method to specify that a AutoSalesCompleted domain-event should be delivered to the CompletedAutoSales topic exchange when published:
protected override void OnDefineEntities()
{
DefineExchange<AutoSaleCompleted>(exchange =>
{
exchange.ExchangeType = ExchangeType.Topic;
exchange.ExchangeName = "CompletedAutoSales";
exchange.AlternateExchangeName = "NonRouted_CompletedAutoSales";
exchange.WhenDomainEvent(m => m.Make != "Yugo");
exchange.RouteKey( m => $"{m.Make}.{m.Model}.{m.Year}");
});
}
The above routing completes the following:
- Creates a topic exchange named CompletedAutoSales
- Specifies that the value of the route key should consist of a pattern containing the value of the Make, Model, and Year.
- Specifies a predicate so any published domain-event that is an Yugo will not be delivered to the exchange.
The subscribing microservice defines a route, specifying the method to be called, when a domain-event is published to the exchange having a matching routing pattern. Within the directory Examples.RabbitMQ.App/Handlers, define the following handler:
using System;
using Examples.RabbitMQ.Domain.Events;
using NetFusion.Common.Extensions;
namespace Examples.RabbitMQ.App.Handlers;
public class AutoSalesHandler
{
public void OnGermanAutoSales(AutoSaleCompleted completedSale)
{
Console.WriteLine(nameof(OnGermanAutoSales));
Console.WriteLine(completedSale.ToIndentedJson());
}
public void OnAmericanAutoSales(AutoSaleCompleted completedSale)
{
Console.WriteLine(nameof(OnAmericanAutoSales));
Console.WriteLine(completedSale.ToIndentedJson());
}
}
Define the following routes to dispatch the received domain-events to the above handler methods when received on a specific queue:
using EasyNetQ.Topology;
using Examples.RabbitMQ.App.Handlers;
using Examples.RabbitMQ.Domain.Events;
using NetFusion.Integration.RabbitMQ;
namespace Examples.RabbitMq.Infra;
public class RabbitMqBusRouter : RabbitMqRouter
{
public RabbitMqBusRouter() : base("testBus")
{
}
protected override void OnDefineEntities()
{
SubscribeToExchange<AutoSaleCompleted>("CompletedAutoSales",
new[] {"VW.*.2017", "BMW.*.2018" },
route =>
{
route.ToConsumer<AutoSalesHandler>(c => c.OnGermanAutoSales,
meta =>
{
meta.QueueName = "GermanAutosSales";
});
});
SubscribeToExchange<AutoSaleCompleted>("CompletedAutoSales",
new[] {"Chevy.Corvette.*", "Buick.*.2019" },
route =>
{
route.ToConsumer<AutoSalesHandler>(c => c.OnAmericanAutoSales,
meta =>
{
meta.QueueName = "AmericanAutosSales";
});
});
}
}
Add a model to the following directory: Examples.RabbitMQ.WebApi/Models
using System.ComponentModel.DataAnnotations;
namespace Examples.RabbitMQ.WebApi.Models;
public class AutoSalesModel
{
[Required] public string Make { get; set; } = string.Empty;
[Required] public string Model { get; set; } = string.Empty;
[Required] public string Color { get; set; } = string.Empty;
public int Year { get; set; }
public bool IsNew { get; set; }
}
Add a controller to the following directory: Examples.RabbitMQ.WebApi/Controllers
namespace Examples.RabbitMQ.WebApi.Controllers;
[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
private readonly IMessagingService _messaging;
public ExampleController(
IMessagingService messaging)
{
_messaging = messaging;
}
[HttpPost("auto/sales")]
public async Task<IActionResult> AutoSales(AutoSalesModel model)
{
if (!ModelState.IsValid)
{
return BadRequest(ModelState);
}
var domainEvent = new AutoSaleCompleted(model.Make, model.Model, model.Year)
{
Color = model.Color,
IsNew = model.IsNew
};
await _messaging.PublishAsync(domainEvent);
return Ok();
}
}
Complete the following to run the example microservice and send a HTTP Post request to the example controller:
cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run
Post the following requests to: http://localhost:5000/api/example/auto/sales
{
"make": "BMW",
"model": "850",
"year": 2018,
"color": "Red",
"isNew": false
}
{
"make": "Chevy",
"model": "Corvette",
"year": 2030,
"color": "White",
"isNew": true
}
Since the IsAutoCreateEnabled configuration property was set to true within the Program.cs file when bootstrapping the microservice, the specified exchange and queues are automatically created:
When the CompletedAutoSales exchange was created, the AlternateExchangeName was set to NonRouted_CompletedAutoSales. When a domain-event is published and can't be delivered to any of the bound queues with a matching Route Key pattern, it will be send to this exchange. A queue with the same name is created and bound to the exchange to store any non-delivered messages.
To test non-delivered messages being delivered to the alternate exchange/queue, post the following:
{
"make": "Honda",
"model": "Accord",
"year": 2021,
"color": "Silver",
"isNew": false
}
The following shows that the queue now contains one message:
- If a message is published and there are no queues with patterns matching the RouteKey, the message is discarded.
- When multiple instances of the same Microservice are executing, the messages are delivered round-robin.
- Multiple patterns can be specified for a single handler/queue binding.
- The route key patterns can be specified externally within the application configuration.
-
Templates
-
Resources
-
Bootstrapping
-
Modules Details
-
Settings
-
Validation
-
Monitoring
- Setup
- Commands
- Queries
- Domain Events
- Message Logs
- Message Publishers
- Message Enrichers
- Message Filters
-
Azure Service Bus
-
RabbitMQ
-
Redis
-
MongoDB