Skip to content

integration rabbitmq direct

Brian Greco edited this page Mar 18, 2023 · 3 revisions

IMAGE RabbitMQ: Direct Exchanges

A direct message exchange is based on a string value comparison used to determine the associated queues to which the message should be delivered. Domain-Events are published to direct exchanges and are often used for microservice integration.

Note that this is very similar to a topic exchange but the string value is not based on a pattern comparison. A domain-event will be delivered to a queue if a binding route-key exactly matches the route-key associated with the published message.

Overview

Domain events published to direct exchanges are often used for microservice integrations. The publisher defines the exchange to which a given domain-event will be published. However, the publisher does not know of all the consumers of the exchange. The consumers, knowing the exchange name, defines queues to which events will be received matching specified route-keys. If a consumer is not executing, the messages will be queued and delivered the next time the subscriber starts.


IMAGEThe message publisher specifies a RouteKey containing a simple string value based on an agreed convention between the publisher and subscriber.


For example, the following could be a RouteKey for a domain-event used to update other microservices that an auto sale has been completed for a given make:

IMAGE

Define Domain-Event

Add the following domain-event class to the Examples.RabbitMQ.Domain/Events directory:

using NetFusion.Messaging.Types;

namespace Examples.RabbitMQ.Domain.Events;

public class PropertySold : DomainEvent
{
    public string Address { get; }
    public string City { get; }
    public string State { get; }
    public string Zip { get; }
    public decimal AskingPrice { get; init; }
    public decimal SoldPrice { get; init; }

    public PropertySold(string address, string city, string state, string zip)
    {
        Address = address;
        City = city;
        State = state;
        Zip = zip;
    }
}

Publishing Microservice

Add the following code to the RabbitMqBusRouter OnDefineEntities method to specify that a PropertySold domain-event should be delivered to the RealEstate direct exchange when published:

DefineExchange<PropertySold>(exchange =>
{
    exchange.ExchangeType = ExchangeType.Direct;
    exchange.ExchangeName = "RealEstate";
    exchange.RouteKey(e => e.State);
    exchange.WhenDomainEvent(e => e.Zip != "15068");
});

The above routing completes the following:

  • Creates a direct exchange named RealEstate
  • Specifies that the value of the state property should be used as the route key
  • Specifies a predicate so any published domain-event with a zip code of 15068 will not be delivered to the exchange.

Subscribing Microservice

The subscribing microservice defines a route, specifying the method to be called, when a domain-event is published to the exchange having a matching routing key(s). Within Examples.RabbitMQ.App/Handlers, define the following handler

using System;
using System.Threading;
using System.Threading.Tasks;
using Examples.RabbitMQ.Domain.Events;
using NetFusion.Common.Extensions;

namespace Examples.RabbitMQ.App.Handlers;

public class RealEstateHandler
{
    public async Task OnNorthEastProperty(PropertySold domainEvent, CancellationToken token)
    {
        Console.WriteLine(nameof(OnNorthEastProperty));
        
        await Task.Delay(TimeSpan.FromMilliseconds(1), token);
        Console.WriteLine(domainEvent.ToIndentedJson());
        
        token.ThrowIfCancellationRequested();
    }
    
    public void OnSouthEastProperty(PropertySold domainEvent, CancellationToken token)
    {
        Console.WriteLine(nameof(OnSouthEastProperty));
        Console.WriteLine(domainEvent.ToIndentedJson());
    }
}

Define the following routes to dispatch the received domain-events to the above handler methods when received on a specific queue:

protected override void OnDefineEntities()
{
    SubscribeToExchange<PropertySold>("RealEstate", 
        new []{ "CT", "NY", "NH", "ME" }, 
        route => route.ToConsumer<RealEstateHandler>(
            c => c.OnNorthEastProperty,
            queue =>
            {
                queue.QueueName = "NorthEastProperties";
            }));

    SubscribeToExchange<PropertySold>("RealEstate", 
        new []{ "NC", "SC", "FL" }, 
        route => route.ToConsumer<RealEstateHandler>(
            c => c.OnSouthEastProperty,
            queue =>
            {
                queue.QueueName = "SouthEastProperties";
            }));
}

Define Api Model

Add the model to the following directory: Examples.RabbitMQ.WebApi/Models

using System.ComponentModel.DataAnnotations;

namespace Examples.RabbitMQ.WebApi.Models;

public class PropertySoldModel
{
    [Required] public string Address { get; set; } = string.Empty;
    [Required] public string City { get; set; } = string.Empty;
    [Required] public string State { get; set; } = string.Empty;
    [Required] public string Zip { get; set; } = string.Empty;
    
    public decimal AskingPrice { get; init; }
    public decimal SoldPrice { get; init; }
}

Define Api Controller

Add the controller to the following directory: Examples.RabbitMQ.WebApi/Controllers

using Examples.RabbitMQ.Domain.Events;
using Examples.RabbitMQ.WebApi.Models;
using Microsoft.AspNetCore.Mvc;
using NetFusion.Messaging;

namespace Examples.RabbitMq.WebApi.Controllers;

[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
    private readonly IMessagingService _messaging;

    public ExampleController(IMessagingService messaging)
    {
        _messaging = messaging;
    }

    [HttpPost("properties/sold")]
    public async Task<IActionResult> PropertySold(PropertySoldModel model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var domainEvent = new PropertySold(model.Address, model.City, model.State, model.Zip)
        {
            AskingPrice = model.AskingPrice,
            SoldPrice = model.SoldPrice
        };

        await _messaging.PublishAsync(domainEvent);
        return Ok();
    }
}

Execute Example

Complete the following to run the example microservice and send a HTTP Post request to the example controller:

Execute Microservice

cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run

Send Requests

Post the following requests to: http://localhost:5000/api/example/properties/sold

{
    "address": "444 Main Street",
    "city": "Cheshire",
    "state": "CT",
    "zip": "06410",
    "askingPrice": 560000,
    "soldPrice": 555000
}

IMAGE

IMAGE

{
    "address": "143 West Franklyn Street",
    "city": "Chapel Hill",
    "state": "NC",
    "zip": "27516",
    "askingPrice": 460000,
    "soldPrice": 455000
}

IMAGE

MAGE

  • When a domain-event is published for the state of CT, the log shows the message was delivered to the NorthEastProperties queue.
  • Likewise, when a domain-event is published for the state of NC, the log shows the message was delivered to the SouthEastProperties queue.

RabbitMQ Configurations

Since the IsAutoCreateEnabled configuration property was set to true within the Program.cs file when bootstrapping the microservice, the specified exchange and queues are automatically created:

IMAGE

IMAGE

IMAGE

Notes

  • If a message is published for which there are no queues matching the RouteKey, the message is discarded.
  • When multiple instances of the same microservice are executing, the messages are delivered round-robin.
  • Multiple route-keys can be specified for a single handler/queue binding.
  • The route keys can be specified externally within the application configuration.
Clone this wiki locally