-
Notifications
You must be signed in to change notification settings - Fork 2
integration rabbitmq fanout
Unlike the direct and topic exchanges, a fan-out message exchange does not use the RouteKey to determine which queues a message should be delivered. Instead, the exchange dispatches the message to all bound queues.
Domain-events associated with a fan-out exchange do not utilize the route-key.
Add the following domain-event class to the Examples.RabbitMQ.Domain/Events directory:
using NetFusion.Messaging.Types;
namespace Examples.RabbitMQ.Domain.Events;
public class ConfigurationUpdated : DomainEvent
{
public string MinLogLevel { get; }
public bool ClearStatistics { get; }
public int RestoreAfterSeconds { get; }
public ConfigurationUpdated(string minLogLevel, bool clearStatistics, int restoreAfterSeconds)
{
MinLogLevel = minLogLevel;
ClearStatistics = clearStatistics;
RestoreAfterSeconds = restoreAfterSeconds;
}
}
Add the following code to the RabbitMqBusRouter OnDefineEntities method to specify when a ConfigurationUpdated domain-event is published it should be delivered to the ConfigurationUpdates direct exchange:
DefineExchange<ConfigurationUpdated>(exchange =>
{
exchange.ExchangeType = ExchangeType.Fanout;
exchange.ExchangeName = "ConfigurationUpdates";
exchange.IsDurable = false;
});
The subscribing microservice defines a route, specifying the method to be called, when a domain-event is published to the exchange. Define the following handler:
using System;
using Examples.RabbitMQ.Domain.Events;
using NetFusion.Common.Extensions;
namespace Examples.RabbitMQ.App.Handlers;
public class ConfigurationHandler
{
public void OnConfigurationUpdate(ConfigurationUpdated domainEvent)
{
Console.WriteLine(nameof(OnConfigurationUpdate));
Console.WriteLine(domainEvent.ToIndentedJson());
}
}
Define the following routes to dispatch the received domain-events to the above handler method when received on a queue bound to the exchange:
SubscribeToFanOutExchange<ConfigurationUpdated> ("ConfigurationUpdates",
route => route.ToConsumer<ConfigurationHandler>(
c => c.OnConfigurationUpdate, queue =>
{
queue.QueueName = "UpdateServiceConfiguration";
queue.IsAutoDelete = true;
}), isPerServiceInstance: false);
The above completes the following:
- Defines a queue name UpdateServiceConfiguration bound to the ConfigurationUpdates exchange
- Specifies that the queue should be deleted when all consuming microservices are stopped
- The isPerServiceInstance is set to false indicating that a single queue should be created to which all microserivces subscribe. This results in the messages being delivered round-robin to all subscribed microservices.
Add a corresponding model to the following directory: Examples.RabbitMQ.WebApi/Models
using System.ComponentModel.DataAnnotations;
namespace Examples.RabbitMQ.WebApi.Models;
public class ConfigurationModel
{
[Required] public string MinLogLevel { get; set; } = string.Empty;
public bool ClearStatistics { get; set; }
public int RestoreAfterSeconds { get; set; }
}
namespace Examples.RabbitMQ.WebApi.Controllers;
[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
private readonly IMessagingService _messaging;
public ExampleController(
IMessagingService messaging)
{
_messaging = messaging;
}
[HttpPost("services/configuration")]
public async Task<IActionResult> UpdateConfiguration(ConfigurationModel model)
{
if (!ModelState.IsValid)
{
return BadRequest(ModelState);
}
var domainEvent = new ConfigurationUpdated(model.MinLogLevel, model.ClearStatistics, model.RestoreAfterSeconds);
await _messaging.PublishAsync(domainEvent);
return Ok();
}
}
Complete the following to run the example microservice and send a HTTP Post method to the example controller:
cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run
Post the following request to: http://localhost:5000/api/example/services/configuration
{
"minLogLevel": "Debug",
"clearStatistics": true,
"restoreAfterSeconds": 60
}
If the above request is made several times, the following shows how the command is routed round-robin between two executing microserivces. This is the behavior when the isPerServiceInstance is specified as false.
Next, the above example will be repeated but with the isPerServiceInstance set to true. This will result in each running microservice having it own assigned queue. The result will be that each running microservice instance will be delivered the message.
Setting the isPerServiceInstance to true can be used to broadcast a message to all running microserivces running within container coordinator such as Kubernetes. This can be done with a given message is relevant to all running Microservice instances. The above example simulates an event that is published when the log level should be changed. In this case, we would want the message to be delivered to all running microservice instances and not round-robin to a single instance.
The following shows the two independent queues created for each microservice when the isPerServiceInstance is set to true:
-
Templates
-
Resources
-
Bootstrapping
-
Modules Details
-
Settings
-
Validation
-
Monitoring
- Setup
- Commands
- Queries
- Domain Events
- Message Logs
- Message Publishers
- Message Enrichers
- Message Filters
-
Azure Service Bus
-
RabbitMQ
-
Redis
-
MongoDB