-
Notifications
You must be signed in to change notification settings - Fork 2
integration rabbitmq workqueue response
This messaging pattern is similar to Work Queue pattern, however, the publishing Microservice expects an asynchronous response on a replay queue. When the response is received, it can be correlated with the the original request.
When the handler of the microservice defining the Queue returns a response, it will be routed back to the publishing microservice on the specified replay queue.
The subscribing microservice might not be able to return a response directly from the handler method. In a business application, the microservice's handler might save the received command message and return a response after a certain business process takes place. In this case, the IQueueResponseService can be used to send a response once it is ready. An example of using the IQueueResponseService will be shown in an upcoming example.*
Add the following command classes to the Examples.RabbitMQ.Domain/Commands directory:
using NetFusion.Messaging.Types;
namespace Examples.RabbitMQ.Domain.Commands;
public class GenerateCarFaxReport : Command<CarFaxReport>
{
public string Vin { get; }
public string State { get; }
public GenerateCarFaxReport(string vin, string state)
{
Vin = vin;
State = state;
}
}
The above command is defined to return a CarFaxReport command defined as follows:
using NetFusion.Messaging.Types;
namespace Examples.RabbitMQ.Domain.Commands;
public class CarFaxReport : Command
{
public string Make { get; }
public string Model { get; }
public int Year { get; }
public int Score { get; }
public CarFaxReport(string make, string model, int year, int score)
{
Make = make;
Model = model;
Year = year;
Score = score;
}
}
The subscribing microservice defines the queue and a route specifying the method to be called when a command is sent to the queue. Once the message is received by the handler, the method returns an object of the command's response type. This response command will be delivered back to the publishing microservice on the specified replay queue. Define the following handler within this directory: Examples.RabbitMQ.App/Handlers
using System;
using Examples.RabbitMQ.Domain.Commands;
using NetFusion.Common.Extensions;
namespace Examples.RabbitMQ.App.Handlers;
public class CarFaxGenerationHandler
{
public CarFaxReport GenerateCarFax(GenerateCarFaxReport command)
{
Console.WriteLine(nameof(CarFaxReportHandler));
Console.WriteLine(command.ToIndentedJson());
return command.State switch
{
"CT" => new CarFaxReport("Volvo", "970", 1989, 100),
"NC" => new CarFaxReport("Audi", "R8", 2017, 350),
_ => new CarFaxReport("Yugo", "GL", 1987, 1)
};
}
}
Define the following route to dispatch the received command to the above handler method when received on the associated queue. Add the following code to the OnDefineEntities method of the RabbitMqBusRouter class: Examples.RabbitMQ.Infra/Routers/RabbitMqBusRouter.cs
DefineQueueWithResponse<GenerateCarFaxReport, CarFaxReport>(route =>
{
route.ToConsumer<CarFaxGenerationHandler>(c => c.GenerateCarFax, queue =>
{
queue.QueueName = "GenerateCarFaxReports";
});
});
The above routing completes the following:
- Defines a queue named GenerateCarFaxReports.
- When a command is received on the GenerateCarFaxReports queue, it should be dispatched to the GenerateCarFax method of the CarFaxGenerationHandler class.
The publishing microservice defines a route to deliver the message of type GenerateCarFaxReport to the GenerateCarFaxReports queue when sent. It also specifies the handler to which the response message should be dispatched and the corresponding reply queue. Define the following handler within this directory: Examples.RabbitMQ.App/Handlers
using System;
using Examples.RabbitMQ.Domain.Commands;
using NetFusion.Common.Extensions;
namespace Examples.RabbitMQ.App.Handlers;
public class CarFaxReportHandler
{
public void ReportGenerated(CarFaxReport command)
{
Console.WriteLine(nameof(CarFaxReportHandler));
Console.WriteLine(command.ToIndentedJson());
}
}
Add the following code to the RabbitMqBusRouter OnDefineEntities method: Examples.RabbitMQ.Infra/Routers/RabbitMqBusRouter.cs
RouteToQueueWithResponse<GenerateCarFaxReport, CarFaxReport>("GenerateCarFaxReports", route =>
{
route.ToConsumer<CarFaxReportHandler>(c => c.ReportGenerated, queue =>
{
queue.QueueName = "CompletedCarFaxReports";
});
});
The above routing completes the following:
- Instead of calling the RouteToQueue method, the RouteToQueueWithResponse method is called to indicate a command of type GenerateCarFaxReport should be delivered to the GenerateCarFaxReports queue when sent.
- Specifies that a queue named CompletedCarFaxReports should be created to receive the response message of type CarFaxReport and dispatched to the ReportGenerated method of the CarFaxReportHandler when received.
When the publishing microservice sends a command with an expected response, it must specify a MessageId or CorrelationId so that it can match the response back to the original published command. This can be done manually in code or automatically by adding a message enricher to the message pipeline to mark each published message with these identity values. The Setup topic shows how to add message enrichers.
namespace Examples.RabbitMQ.WebApi.Controllers;
[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
private readonly IMessagingService _messaging;
public ExampleController(
IMessagingService messaging)
{
_messaging = messaging;
}
[HttpPost("generate/carfax/{vin}/{state}")]
public async Task<IActionResult> GenerateCarFax(string vin, string state)
{
if (!ModelState.IsValid)
{
return BadRequest(ModelState);
}
var command = new GenerateCarFaxReport(vin, state);
await _messaging.SendAsync(command);
return Ok();
}
}
Complete the following to run the example microservice and send a HTTP Post method to the example controller:
cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run
Post the following requests to: http://localhost:5000/api/example/generate/carfax/V234SDS234/FL
The above shows that the subscribing microservice received the command on the GenerateCarFaxReports queue and responded by sending a replay message on the reply queue specified by the publishing microservice. The second arrow shows the response command delivered to the replay queue and dispatched to the message handler.
The following shows the created receiving and replay queue that where created:
-
Templates
-
Resources
-
Bootstrapping
-
Modules Details
-
Settings
-
Validation
-
Monitoring
- Setup
- Commands
- Queries
- Domain Events
- Message Logs
- Message Publishers
- Message Enrichers
- Message Filters
-
Azure Service Bus
-
RabbitMQ
-
Redis
-
MongoDB