Skip to content

integration rabbitmq workqueue service

Brian Greco edited this page Mar 18, 2023 · 3 revisions

IMAGE RabbitMQ: Work Queue - Response Service

This example shows how to utilize the IResponseService for sending responses back to the sender of a the original command. The Work Queue - with Response example returns the response to the sender as soon as the command is processed from the queue. The message returned from the commands handler is sent directly back to the original sender by placing the response within their specified replay queue for future processing.

However, for some commands the response might not be available or known by the called message handler. The message handler may save the state of the command and reply much later once the information needed to create the response is available. The IResponseService can be injected into a business service component and used to send a replay to a previously received and saved command.

Overview

This example shows a Subscribing microservice defining a queue name GenerateCarServiceReports on which it receives commands of type GenerateServiceReport that are saved. Once the data needed to create the response command is provided, the original command is used to send the corresponding response back to the originating microservice.

Define Command

Add the following command class to the Examples.RabbitMQ.Domain/Commands directory:

using System;
using NetFusion.Messaging.Types;

namespace Examples.RabbitMQ.Domain.Commands;

public class GenerateServiceReport : Command<ServiceReport>
{
    public string Make { get; }
    public string Model { get; }
    public string Year { get; }
    public int Miles { get; }
    
    public DateOnly? DateLastServiced { get; set; }
    public string? Notes { get; set; }

    public GenerateServiceReport(string make, string model, string year, int miles)
    {
        Make = make;
        Model = model;
        Year = year;
        Miles = miles;
    }
}

The above command is defined to return a ServiceReport command as follows:

using NetFusion.Messaging.Types;

namespace Examples.RabbitMQ.Domain.Commands;

public class ServiceReport : Command
{
    public string Make { get; }
    public string Model { get; }
    public decimal TotalCost { get; }
    public string[] ServiceItems { get; }

    public ServiceReport(string make, string model, decimal totalCost, string[] serviceItems)
    {
        Make = make;
        Model = model;
        TotalCost = totalCost;
        ServiceItems = serviceItems;
    }
}

Subscribing Microservice

The subscribing microservice defines queue and a route specifying the method to be called when a command is sent to the queue. Once the message is received by the handler, the method saves the command for processing until the information required to create the response is available. For this example, the command will be just saved in memory by using a singleton registered repository. Create the interface defining the repository within the following location: Examples.RabbitMQ.App/Repositories

using Examples.RabbitMQ.Domain.Commands;

namespace Examples.RabbitMQ.App.Repositories;

public interface IPendingAutoServiceRepository
{
    public void Add(GenerateServiceReport serviceRequest);
    public void Remove(string correlationId);
    public GenerateServiceReport? Get(string correlationId);
}

Place the repository implementation within the following location: Examples.RabbitMQ.Infra/Repositories

using System;
using System.Collections.Concurrent;
using Examples.RabbitMQ.App.Repositories;
using Examples.RabbitMQ.Domain.Commands;
using NetFusion.Messaging.Types.Attributes;

namespace Examples.RabbitMq.Infra;

public class PendingAutoServiceRepository : IPendingAutoServiceRepository
{
    private readonly ConcurrentDictionary<string, GenerateServiceReport> _requests = new();

    public void Add(GenerateServiceReport serviceRequest)
    {
        var correlationId = serviceRequest.GetCorrelationId();
        if (string.IsNullOrWhiteSpace(correlationId))
        {
            Console.WriteLine("Request has not correlation identifier.");
            return;
        }

        if (_requests.TryAdd(correlationId, serviceRequest))
        {
            Console.WriteLine("Request save for future processing.");

        }
    }

    public void Remove(string correlationId)
    {
        if (_requests.TryRemove(correlationId, out var serviceRequest))
        {
            Console.WriteLine($"Service request completed:  {serviceRequest.Make} / {serviceRequest.Model}");
        }
    }

    public GenerateServiceReport? Get(string correlationId)
    {
        if (!_requests.TryGetValue(correlationId, out var serviceRequest))
        {
            Console.WriteLine($"Pending service report for: {correlationId} not found.");
        }

        return serviceRequest;
    }
}

Add the repository to the microservice's dependency-injection container by adding the following to the existing RepositoryModule located within the infrastructure plugin: Examples.RabbitMQ.Infra/Plugin/Modules/RepositoryModule.cs

using Examples.RabbitMQ.App.Repositories;
using Microsoft.Extensions.DependencyInjection;
using NetFusion.Core.Bootstrap.Plugins;

namespace Examples.RabbitMq.Infra.Plugin.Modules;

public class RepositoryModule : PluginModule
{
    public override void RegisterServices(IServiceCollection services)
    {
        services.AddSingleton<IPendingAutoServiceRepository, PendingAutoServiceRepository>();
    }
}

Define the following handler that will inject the above repository and save the received command to the repository:

using System;
using Examples.RabbitMQ.App.Repositories;
using Examples.RabbitMQ.Domain.Commands;
using NetFusion.Common.Extensions;

namespace Examples.RabbitMQ.App.Handlers;

public class ServiceGenerationHandler
{
    private readonly IPendingAutoServiceRepository _serviceRepository;

    public ServiceGenerationHandler(IPendingAutoServiceRepository serviceRepository)
    {
        _serviceRepository = serviceRepository;
    }
    
    public void OnGenerateServiceReport(GenerateServiceReport command)
    {
        Console.WriteLine(nameof(OnGenerateServiceReport));
        Console.WriteLine(command.ToIndentedJson());
        
        _serviceRepository.Add(command);
    }
}

Define the following route to dispatch the received command to the above handler method when received on the associated queue:

DefineQueue<GenerateServiceReport>(route =>
{
    route.ToConsumer<ServiceGenerationHandler>(
        c => c.OnGenerateServiceReport, 
        queue => queue.QueueName = "GenerateCarServiceReports");
});

Note that unlike the Work Queue - with Response example, the above routing code calls the DefineQueue method and not the DefineQueueWithResponse method. This is because the response will not be returned directly from the message handler.

Publishing Microservice

The publishing microservice will route GenerateServiceReport commands so they are published to the GenerateCarServiceReports queue defined by the subscribing microservice shown above. The route will also specify the replay queue and message handler to be invoked when future responses to commands are received. Besides the different message type and queue names, this code is identical to that used in the Work Queue - With Response example. Define the following handler:

using System;
using Examples.RabbitMQ.Domain.Commands;
using NetFusion.Common.Extensions;

namespace Examples.RabbitMQ.App.Handlers;

public class ServiceReportHandler
{
    public void OnServiceReportReceived(ServiceReport command)
    {
        Console.WriteLine(nameof(OnServiceReportReceived));
        Console.WriteLine(command.ToIndentedJson());
    }
}

Add the following code to the RabbitMqBusRouter OnDefineEntities method to specify when a GenerateServiceReport command is sent it should be delivered to the GenerateCarServiceReports queue and responses should be received on the FinishedServiceReports and dispatched to the OnServiceReportReceived method defined on ServiceReportHandler.

RouteToQueueWithResponse<GenerateServiceReport, ServiceReport>("GenerateCarServiceReports",
  route => route.ToConsumer<ServiceReportHandler>(
      c => c.OnServiceReportReceived, 
      queue => queue.QueueName = "FinishedServiceReports") );

Define Api Models

Create the following classes within the following directory: Examples.RabbitMQ.WebApi/Models

using System.ComponentModel.DataAnnotations;

namespace Examples.RabbitMQ.WebApi.Models;

public class AutoServiceModel
{
    [Required] public string Make { get; set; } = string.Empty;
    [Required] public string Model { get; set; } = string.Empty;
    [Required] public string Year { get; set; } = string.Empty;
    public int Miles { get; set; }
    public DateOnly? DateLastServiced { get; set; }
    public string? Notes { get; set; }
}
using System.ComponentModel.DataAnnotations;

namespace Examples.RabbitMQ.WebApi.Models;

public class ServiceReportModel
{
    [Required] public string CorrelationId { get; set; } = string.Empty;
    [Required] public string[] RequiredServices { get; set; } = Array.Empty<string>();
    public decimal TotalCost { get; set; }
}

Define Api Controller

namespace Examples.RabbitMQ.WebApi.Controllers;

[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
    private readonly IMessagingService _messaging;
    private readonly IQueueResponseService _queueResponse;
    private readonly IPendingAutoServiceRepository _serviceRepository;

    public ExampleController(
        IMessagingService messaging, 
        IQueueResponseService queueResponse,
        IPendingAutoServiceRepository serviceRepository)
    {
        _messaging = messaging;
        _queueResponse = queueResponse;
        _serviceRepository = serviceRepository;
    }

    [HttpPost("auto/services/report")]
    public async Task<IActionResult> GenerateServiceReport([FromBody]AutoServiceModel model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var command = new GenerateServiceReport(model.Make, model.Model, model.Year, model.Miles)
        {
            Notes = model.Notes
        };

        await _messaging.SendAsync(command);
        return Ok(command.GetCorrelationId());
    }
    
    [HttpPost("auto/services/report/complete")]
    public async Task<IActionResult> CompleteServiceReport([FromBody]ServiceReportModel model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var pendingRequest = _serviceRepository.Get(model.CorrelationId);
        if (pendingRequest == null)
        {
            return NotFound();
        }

        var report = new ServiceReport(pendingRequest.Make, pendingRequest.Model,
            model.TotalCost,
            model.RequiredServices);


        await _queueResponse.RespondToSenderAsync(pendingRequest, report);
        _serviceRepository.Remove(model.CorrelationId);
        return Ok();
    }
}

The above controller injects the IPendingAutoServiceRepository instance used to load the prior saved command. Then the method invokes the ResponseToSenderAsync method defined on IQueueResponseService to send the response associated with the command back to the original sending microservice.

Execute Example

Complete the following to run the example microservice and send a HTTP Post requests to the example controller:

Execute Microservice

cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run

Send Requests

Post the following request to: http://localhost:5000/api/example/auto/services/report

This will post the initial command requesting a service request that will not be completed until sometime in the future.

{
    "make": "BMW",
    "model": "3 Series",
    "year": "2002",
    "miles": "140000",
    "notes": "Dam thing will not start."
}

IMAGE

The following shows the initial received request that is saved by the subscribing microservice:

IMAGE

Next, a request will be make to the WebApi controller with the results of the inspection. This will result in the original command being loaded from the repository and having its corresponding replay command sent back to the originating microservice:

Post the following request to: http://localhost:5000/api/example/auto/services/report/complete

{
    "correlationId": "25c80232-f43d-46bf-a223-771687faacbb",
    "requiredServices": [
        "New tires",
        "New lower tie rods",
        "New engine gaskets"
    ],
    "totalCost": 5000
}

IMAGE

IMAGE

Clone this wiki locally