-
Notifications
You must be signed in to change notification settings - Fork 2
integration.amqp.publisher
This section demonstrates how to send/publish command and domain-event messages to the defined Queue and Topic.
As with all messaging, regardless of implementation, commands and domain-events are sent/published using the IMessagingService service interface. The calling code publishing the message, is not coupled to the underlying implementation used to deliver the message. For example, the underlying messaging implementation could be updated from RabbitMQ to AMQP without having to change the calling code. In a addition, the message could be delivered over multiple implementations without the caller's knowledge.
The below configuration within the appsettings.json is needed by the publishing application.
{
"netfusion": {
"amqp": {
"hosts": [
{
"hostName": "claims-bus",
"hostAddress": "claims-bus.servicebus.windows.net",
"username": "Service-Bus-User",
"password": "Service-Bus-User-Password"
}
]
}
}
}
Also, the NetFusion.AMQP NuGet containing the AMQP plugin must be referenced and added to the composite-container by calling the AddAmqp method:
public void ConfigureServices(IServiceCollection services)
{
services.CompositeContainer(_configuration, new SerilogExtendedLogger())
.AddAmqp()
.AddPlugin<InfraPlugin>()
.AddPlugin<AppPlugin>()
.AddPlugin<DomainPlugin>()
.AddPlugin<WebApiPlugin>()
.Compose();
services.AddControllers();
}
The following defines a CreateClaimSubmission command representing a new insurance claim submission. The command will contain the information required to establish the new claim within the system. How the command is actually handled is the responsibility of the consumer that subscribes to the queue. Complete the following steps to define the command and send it to the queue:
Define the command as follows within the Commands directory contained within the Demo.Domain project:
touch ./src/Demo.Domain/Commands/CreateClaimSubmission.cs
nano ./src/Demo.Domain/Commands/CreateClaimSubmission.cs
using NetFusion.Messaging.Types;
namespace Demo.Domain.Commands
{
public class CreateClaimSubmission : Command
{
public string InsuredId { get; set; }
public string InsuredFistName { get; set; }
public string InsuredLastName { get; set; }
public decimal InsuredDeductible { get; set; }
public decimal ClaimEstimate { get; set; }
public string ClaimDescription { get; set; }
}
}
Once the command is defined, the publisher needs to map the command to its associated queue by defining an implementation of the IHostRegistry interface. The base class named, HostRegistryBase contains a base implementation from which the application can derive. This derived class will usually be located within the infrastructure assembly of the host application.
Define the following derived class within the Demo.Infra project:
touch ./src/Demo.Infra/HostItemRegistry.cs
nano ./src/Demo.Infra/HostItemRegistry.cs
using Demo.Domain.Commands;
using NetFusion.AMQP.Publisher;
namespace Demo.Infra
{
public class HostItemRegistry : HostRegistryBase
{
public override string Namespace { get; } = "claims-bus";
public override void OnRegister()
{
AddQueue<CreateClaimSubmission>("claim-submissions");
}
}
}
The above specifies that when the CreateClaimSubmission command is sent, it should be delivered to the claims-submissions queue defined within the claims-bus namespace.
Define WebApi controller to send the command as follows to the Demo.WebApi project:
touch ./src/Demo.WebApi/Controllers/AmqpController.cs
nano ./src/Demo.WebApi/Controllers/AmqpController.cs
using System;
using System.Threading.Tasks;
using Demo.Domain.Commands;
using Demo.Domain.Events;
using Microsoft.AspNetCore.Mvc;
using NetFusion.Messaging;
namespace Demo.WebApi.Controllers
{
[Route("api/integration/amqp")]
[ApiController]
public class AmqpController : ControllerBase
{
private readonly IMessagingService _messaging;
public AmqpController(IMessagingService messaging)
{
_messaging = messaging ?? throw new ArgumentNullException(nameof(messaging));
}
[HttpPost("submission")]
public async Task<IActionResult> SendClaimSubmission([FromBody]CreateClaimSubmission submission)
{
await _messaging.SendAsync(submission);
return Ok();
}
}
}
The following will define a ClaimStatusUpdated domain-event that is published when the status of an existing claim has been updated. How the domain-event is handled is the responsibility fo the consumer that subscribes to the topic. Complete the following steps to define the domain-event and publish it to the topic.
Define the domain-event as follows within the Events directory contained within the Demo.Domain project:
touch ./src/Demo.Domain/Events/ClaimStatusUpdated.cs
nano ./src/Demo.Domain/Events/ClaimStatusUpdated.cs
using System;
using NetFusion.Messaging.Types;
namespace Demo.Domain.Events
{
public class ClaimStatusUpdated : DomainEvent
{
public string InsuredId { get; set; }
public string CurrentStatus { get; set; }
public DateTime NextStatusUpdate { get; set; }
public string NextStatus { get; set; }
}
}
Once the domain-event is defined, the publisher needs to map the domain-event to its associated topic by defining an implementation of the IHostRegistry interface. The base class named, HostRegistryBase contains a base implementation from which the application can derive. This derived class will usually be located within the infrastructure assembly of the host application.
Update the HostItemRegistry by registering the ClaimStatusUpdated domain-event:
nano ./src/Demo.Infra/HostItemRegistry.cs
using NetFusion.AMQP.Publisher;
using Service.Domain.Events;
namespace Demo.Infra
{
public class HostItemRegistry : HostRegistryBase
{
public override string Namespace { get; } = "claims-bus";
public override void OnRegister()
{
AddTopic<ClaimStatusUpdated>("claim-status-notification");
}
}
}
The above specifies that when the ClaimStatusUpdated domain-event is published, it should be delivered to the claim-status-notification topic defined within the claims-bus namespace.
Updated the AmqpController controller to publish the domain-event as follows:
nano ./src/Demo.WebApi/Controllers/AmqpController.cs
using System;
using System.Threading.Tasks;
using Demo.Domain.Commands;
using Demo.Domain.Events;
using Microsoft.AspNetCore.Mvc;
using NetFusion.Messaging;
namespace Demo.WebApi.Controllers
{
[Route("api/integration/amqp")]
[ApiController]
public class AmqpController : ControllerBase
{
private readonly IMessagingService _messaging;
public AmqpController(IMessagingService messaging)
{
_messaging = messaging ?? throw new ArgumentNullException(nameof(messaging));
}
[HttpPost("status-updated")]
public async Task<IActionResult> PublishClaimStatus([FromBody] ClaimStatusUpdated status)
{
await _messaging.PublishAsync(status);
return Ok();
}
}
}
-
Templates
-
Resources
-
Bootstrapping
-
Modules Details
-
Settings
-
Validation
-
Monitoring
- Setup
- Commands
- Queries
- Domain Events
- Message Logs
- Message Publishers
- Message Enrichers
- Message Filters
-
Azure Service Bus
-
RabbitMQ
-
Redis
-
MongoDB