-
Notifications
You must be signed in to change notification settings - Fork 2
messaging setup
NetFusion messaging is an implementation based on the CQRS design pattern that is extendable and allows publishing messages to external consumers. The messaging topics are focused around dispatching messages to in-process message consumers. However, all of the topics discussed are applicable to publishing messages externally to message brokers such as RabbitMQ and Azure Service Bus. The following are the topics discussed:
- Publishing Domain-Events
- Sending Commands
- Executing Queries
- Message Routing
- Message Logging
- Message Publishers
- Message Enrichers
- Message Filters
The dispatching of messages to consumers are specified by defining a message router. The message router discussed in these topics specify how messages should be dispatched to in-memory consumers. Regardless of how a message is delivered, the same routing techniques for routing messages to in-memory consumers applies to routing messages using RabbitMQ or Azure Service Bus.
For example, when routing an in-memory message, the consumer to which a message of a specific type should be routed is specified. Similarly, when routing a message to an external message broker, the queue or topic to which a command or domain-event should be delivered is specified. The subscriber also specifies a route indicating to which in-memory consumer a message should be dispatched when received on a specific queue or topic. What differentiates an in-memory routing from an external message broker routing is the metadata specified when defining the route.
CQRS Design Pattern in Microservices Architectures
Deep Dive into CQRS — A Great Microservices Pattern
The messaging topics assume an example microservice was creating using the NetFusion microservice template. The example microservice can be created by completing the following steps.
If the netfusion-microservice template is not installed execute the following:
dotnet new install NetFusion.Templates
Next, create the example solution:
cd mkdir Examples.Messaging
cd Examples.Messaging
dotnet new netfusion-microservice --port 5670
The generated microservice is configured to use Serilog and SEQ. A SEQ server instance can be executed within Docker by running the following:
cd ./seq
docker-compose up -d
The SEQ log web interface can be viewed here: http://localhost:8051
There are three types of messages that can be dispatched: Commands, Queries, and Domain-Events. All types of messages derive from the base IMessage interface. Messages are defined by implementing the ICommand, IQuery or IDomainEvent interfaces. The Command, Query and DomainEvent classes provide base implementations from which application specific commands and domain-events can derive.
A command is a message indicating an action being requested altering the state of the microservice. Commands can have one and only one consumer. If more than one consumer is found, and exception is raised. Commands can also have an optional response set by the consumer's command-handler.
A query is a message executed to retrieve data maintained by the microservice. Queries and commands allow the reading and writing of data to be separate allowing queries to be more performant by using sources specifically for reading the data written by commands.
A domain-event is a message that notifies other application components that an action has taken place. This allows one or more consumer event-handlers to react to the event. Unlike a command, a domain-event does not have an associated response type but can have multiple event-handlers.
A domain-event source is not a type of message but any domain-entity implementing the IEventSource interface. This interface defines an enumeration of IDomainEvent instances. When a domain-entity implementing the IEventSource interface is published, the list of associated domain-events are published to consumers.
While NetFusion provides an implementation of the CQRS pattern, this does not mean it should be used as an overarching design for the entire microservice. For example, if a business service needs to lookup information based on a given identity value, defining a query would most likely be overkill. Likewise, creating a command to persist a few values would more easily be accomplished by directly calling a repository.
Commands, Queries, and Domain-Events should be reserved to model well defined entities belonging to the business process implemented by the microservice. Additionally, they are used when integrating microserivces by sending commands and publishing domain-events over a central message broker such as RabbitMQ or Azure Service Bus.
The messaging implementation is contained within the following NuGet packages:
- NetFusion.Messaging.Types
- NetFusion.Messaging
The NetFusion.Messaging.Types NuGet package contains the base message contracts used to define Command, Queries, and Domain-Events. The microservice solution template adds a reference to this NuGet package's assembly to the Domain project.
The NetFusion.Messaging contains the plugin used to route messages to in-process consumers and is not bootstrapped by the generated microservice solution and must be manually added. Add a reference to the NetFusion.Messaging NuGet package to the Infra project by completing the following:
dotnet add ./src/Components/Examples.Messaging.Infra package NetFusion.Messaging
Next, add the following line of code to add the plugin to the composite-container:
// Add Plugins to the Composite-Container:
builder.Services.CompositeContainer(builder.Configuration, new SerilogExtendedLogger())
.AddSettings()
.AddMessaging() // <-- Add this line
.AddPlugin<InfraPlugin>()
.AddPlugin<AppPlugin>()
.AddPlugin<DomainPlugin>()
.AddPlugin<WebApiPlugin>()
.Compose();
The following will add a message router configured to route messages in the upcoming topics. A message route is used to specify how a given published message is routed to its corresponding message consumer. As discussed above, the RabbitMQ and Azure Service Bus plugins also have their own message routers used to specify how commands and domain-events are published to queue and topics and routed to consumer message handlers.
Since message routing is an infrastructure concern, add the following class to the Routers directory of the Examples.Messaging.Infra project as follows:
using NetFusion.Messaging.InProcess;
namespace Examples.Messaging.Infra.Routers;
public class InMemoryRouter : MessageRouter
{
protected override void OnConfigureRoutes()
{
}
}
Consumer message handlers can be any of the following:
-
Synchronous method handler where the parameter is the type of the specific message to handle.
- If a handler for a command with a response, the handler method returns the response type. Otherwise the method has a void return type.
- If a handler for a query, the handler method returns the response type.
- If a handler for a domain-event, the handler method has a void return type.
-
Asynchronous method handler returning a task where the parameter is the type of specific message to handle with an optional cancelation token.
- If a handler for a command with a response, the handler method returns a Task of the response type. Otherwise the method returns just a Task.
- If a handler for a query, the handler method returns a Task of the response type.
- If a handler for a domain-event, the handler method has a Task return type.
Below is an example of a consumer for a domain-event message handler for reference:
using System;
using System.Threading;
using System.Threading.Tasks;
using Examples.Messaging.Domain.Events;
using Microsoft.Extensions.Logging;
namespace Examples.Messaging.App.Handlers;
public class GermanAutoSalesHandler
{
private readonly ILogger<GermanAutoSalesHandler> _logger;
public GermanAutoSalesHandler(ILogger<GermanAutoSalesHandler> logger)
{
_logger = logger;
}
public async Task OnRegistration(AutoSoldEvent domainEvent, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
_logger.LogInformation("Domain Event Received by {handler} for {make} and {model}.",
nameof(GermanAutoSalesHandler), domainEvent.Make, domainEvent.Model);
}
}
All message types are dispatched using IMessagingService and can be injected into any component registered within the dependency-injection container. This also applies to messages published out of process over a central message broker between microservices.
The below code shows a domain-event being published:
using Examples.Messaging.Domain.Events;
using Examples.Messaging.WebApi.Models;
using Microsoft.AspNetCore.Mvc;
using NetFusion.Messaging;
namespace Examples.Messaging.WebApi.Controllers;
[ApiController, Route("api/messaging")]
public class MessageController : ControllerBase
{
private readonly IMessagingService _messaging;
public MessageController(IMessagingService messaging)
{
_messaging = messaging;
}
[HttpPost("auto/sales")]
public async Task<IActionResult> AutoSalesCompleted([FromBody]AutoSalesModel model)
{
if (!ModelState.IsValid)
{
return BadRequest(ModelState);
}
var domainEvt = new AutoSoldEvent(
model.Make,
model.Model,
model.Year);
await _messaging.PublishAsync(domainEvt); // <-- Domain-Event being published
return Ok();
}
}
From the publisher's perspective, all domain-events appear to be published asynchronously. This completely decouples the publisher from the consumer. This concern is removed from the publisher and is the responsibility of the message dispatcher. If a message handler method is changed from being synchronous to asynchronous, the only code that needs to change is the message handler method and non of the calling code.
Since all message types derive from IMessage, they all have an Attributes dictionary property having string key and value pairs. This allows random key/value pairs to be associated with all message types. A string value type has been chosen over the object type since strings can be easily serialized and deserialized.
The NetFusion.Messaging.Types NuGet package contains extension methods used to set and retrieve attribute values and array of values stored as strings. For more details about setting message attributes for all dispatched messages, the topic titled Message Enrichers can be referenced.
-
Templates
-
Resources
-
Bootstrapping
-
Modules Details
-
Settings
-
Validation
-
Monitoring
- Setup
- Commands
- Queries
- Domain Events
- Message Logs
- Message Publishers
- Message Enrichers
- Message Filters
-
Azure Service Bus
-
RabbitMQ
-
Redis
-
MongoDB