Skip to content

integration rabbitmq rpc

Brian Greco edited this page Mar 18, 2023 · 3 revisions

IMAGE RabbitMQ: RPC

Unlike the previous discussed exchanges and queues, that are completely asynchronous, the RPC message style requires the publisher sending the command to wait until the RPC command response is returned. However, the publisher can preform other non-dependent tasks while waiting for the response.

Overview

This message pattern is more complex since it utilizes two queues. One queue defined by the subscriber is used to receive command message requests. The other queue is dynamically created by the publisher and is used to receive replies to sent commands. 5 seconds is the default time out that the client will wait for a reply to the command. A different timeout value can be specified within the application's configuration settings or when defining the route.

The service processing the command defines the name of queue on which they will receive commands. The publisher then sends a command to the queue by setting the route-key of the command to the name of the queue. This is also how a command is sent to a work-queue but for a RPC command, the publisher also specifies a message-namespace.


IMAGE An message-namespace is a string used to uniquely identify the action associated with the command within the queue. When defining a RPC command message handler, the subscriber can use the same queue name for multiple command types having different namespaces.


This allows the subscriber to group related commands into the same queue. When the request arrives in the queue, the message-namespace is used to determine which handler should be used to process the command. This allows for an efficient use of queues since a group of related commands, identified by namespace, can be processed on a single queue.


IMAGEThe publisher configures the queue and message-namespace for each command to be used when sending a command to the subscribing microservice. When a command is published, the message is sent to the specified queue and tagged with the namespace.


Also, the message id is set if not already present on the command. For each RPC command queue, a pending-request task is created that listens for replays to commands. When the reply is received, the message id of the response message is used to find the corresponding pending-request task associated with the original command. Based on the state of the response message, the task is either completed or placed in an error state. If the response message is marked as being an error, the task is configured to throw an exception using the error information contained within the response message.

IMAGE

Define RPC Commands

The following two commands will be published to the same RPC queue and differentiated based on their assigned message-namespace. The commands calculate auto and property taxes and both return the same response type. Define the following response class and the two commands within the following directory: Examples.RabbitMQ.Domain/Commands

using System;

namespace Examples.RabbitMQ.Domain.Commands;

public class TaxCalc
{
    public decimal Amount { get; }
    public DateTime DateCalculated { get; }

    public TaxCalc(decimal amount, DateTime dateCalculated)
    {
        Amount = amount;
        DateCalculated = dateCalculated;
    }
}
using NetFusion.Messaging.Types;

namespace Examples.RabbitMQ.Domain.Commands;

[MessageNamespace("Accounting.AutoTax")]
public class CalculateAutoTax : Command<TaxCalc>
{
    public string Vin { get; } 
    public string ZipCode { get; }

    public CalculateAutoTax(string vin, string zipCode)
    {
        Vin = vin;
        ZipCode = zipCode;
    }
}
using NetFusion.Messaging.Types;

namespace Examples.RabbitMQ.Domain.Commands;

[MessageNamespace("Accounting.PropertyTax")]
public class CalculatePropertyTax : Command<TaxCalc>
{
    public string Address { get; }
    public string City { get; }
    public string State { get; }
    public string Zip { get; }

    public CalculatePropertyTax(string address, string city, string state, string zip)
    {
        Address = address;
        City = city;
        State = state;
        Zip = zip;
    }
}

Subscribing Microservice

The subscribing microservice defines the queue on which it will receive incoming RPC requests.

Add the following handler to the Examples.RabbitMQ/App/Handlers directory:

using System;
using System.Threading;
using System.Threading.Tasks;
using Examples.RabbitMQ.Domain.Commands;

namespace Examples.RabbitMQ.App.Handlers;

public class TaxCalculationHandler
{
    public async Task<TaxCalc> CalculatePropertyTax(CalculatePropertyTax command, CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        Console.WriteLine(command.State);

        if (command.State == "CT")
        {
            await Task.Delay(TimeSpan.FromSeconds(8), cancellationToken);
        }
        
        return command.State == "CT" ? new TaxCalc(20_500, DateTime.UtcNow)
            : new TaxCalc(3_000, DateTime.UtcNow);
    }
    
    public TaxCalc CalculateAutoTax(CalculateAutoTax command)
    {
        Console.WriteLine(command.ZipCode);

        return command.ZipCode == "06410" ? new TaxCalc(1_000, DateTime.UtcNow)
            : new TaxCalc(200, DateTime.UtcNow);
    }
}

Add the following to the Examples.RabbitMQ.Infra/Routers/RabbitMqBusRouter.cs

DefineRpcQueue("TaxCalculations", meta => { meta.PrefetchCount = 5; });

Next, add the following routes to dispatch the two commands based on their message-namespaces to their corresponding handler methods:

DefineRpcQueueRoute<CalculateAutoTax, TaxCalc>("TaxCalculations", route =>
{
    route.ToConsumer<TaxCalculationHandler>(c => c.CalculateAutoTax);
});

DefineRpcQueueRoute<CalculatePropertyTax, TaxCalc>("TaxCalculations", route =>
{
    route.ToConsumer<TaxCalculationHandler>(c => c.CalculatePropertyTax);
});

The above completes the following:

  • Each RPC command has their associated message-namespace specific by using the MessageNamespace attribute.
  • The handler is the same as all other handers but the command type and its associated message-namespace are used to determine the called handler method.
  • A queue named TaxCalculations is defined on which the commands will be received with a pre-fetch count of 5.
  • The DefineRpcQueueRoute method is called to specify the handler methods to be invoked when each type of command is received.
  • NOTE: The message-namespace has to be specified using the MessageNamespace attribute or can be specified when calling the DefineRpcQueueRoute method as a parameter. If specified in both places, the parameter specified value will be used.

Publishing Microservice

The publishing microservice specifies the RPC queue to which each command should be sent and its associated message-namespace. As with the subscriber, the message-namespace can be specified using the MessageNamespace attribute or directly when calling the RouteToRpcQueue method below. Add the following two routes:

RouteToRpcQueue<CalculateAutoTax>("TaxCalculations");
RouteToRpcQueue<CalculatePropertyTax>("TaxCalculations");

NOTE: The publishing microservice does not specify a message handler class since the response is returned directly from the Send method of IMessagingService when sending the command:

var command = new CalculatePropertyTax(model.Address, model.City, model.State, model.Zip);
var result = await _messaging.SendAsync(command);

Define Api Model

Add a corresponding model to the following directory: Examples.RabbitMQ.WebApi/Models

using System.ComponentModel.DataAnnotations;

namespace Examples.RabbitMQ.WebApi.Models;

public class AutoTaxModel
{
    [Required]
    public string Vin { get; set; }  = string.Empty;
    
    [Required]
    public string ZipCode { get; set; }  = string.Empty;
}
using System.ComponentModel.DataAnnotations;

namespace Examples.RabbitMQ.WebApi.Models;

public class PropertyTaxModel
{
    [Required]
    public string Address { get; set; } = string.Empty;
    
    [Required]
    public string City { get; set; } = string.Empty;
    
    [Required]
    public string State { get; set; } = string.Empty;
    
    [Required]
    public string Zip { get; set; } = string.Empty;
}

Define Api Controller

namespace Examples.RabbitMQ.WebApi.Controllers;

[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
    private readonly IMessagingService _messaging;

    public ExampleController(
        IMessagingService messaging)
    {
        _messaging = messaging;
    }
  
    [HttpPost("calculations/auto/tax")]
    public async Task<IActionResult> CalculateAutoTax([FromBody] AutoTax model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var command = new CalculateAutoTax(model.Vin, model.ZipCode);
        var result = await _messaging.SendAsync(command);
        return Ok(result);
    }

    [HttpPost("calculations/property/tax")]
    public async Task<IActionResult> CalculatePropertyTax([FromBody] PropertyTax model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var command = new CalculatePropertyTax(model.Address, model.City, model.State, model.Zip);
        var result = await _messaging.SendAsync(command);
        return Ok(result);
    }
}

Execute Example

Complete the following to run the example microservice and send a HTTP Post request to the example controller:

Execute Microservice

cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run

Send Requests

Post the following requests to: http://localhost:5000/api/example/calculations/auto/tax

{
    "vin": "VW32423DFDFG23423",
    "zipCode": "06410"
}

IMAGE

Post the following requests to: http://localhost:5000/api/example/calculations/property/tax

{
    "address": "334 5th Avenue",
    "city": "New Kensington",
    "state": "PA",
    "zip": "15068"
}

IMAGE

RabbitMQ Configuration

The following shows the queue created by the subscribing microservice on which the RPC command are received (green arrow). Also, the reply queue on which the publishing microservice receives its replies is shown by the blue arrow.

IMAGE

Clone this wiki locally