Skip to content

integration rabbitmq workqueue

Brian Greco edited this page Mar 18, 2023 · 3 revisions

IMAGE RabbitMQ: Work Queue

Unlike the previous discussed exchanges and queues, work queues do not have a concept of an exchange.


IMAGE In RabbitMQ terms, work queues are created on the default exchange. Also, commands and not domain-events are sent to work queues.


Unlike the Direct, Topic, and Fan-Out exchanges, where the consumers are not directly known, the publisher knows of the specific consumer that will process the command. Where exchanges are used to allow one microservice to integrate with another by publishing domain-events, a command is used to explicitly tell a microservice to take an action. Commands are sent to work queues to preform longer running tasks that can be completed asynchronously.

Overview

The route-key specified as a message property is used to determine the queue to which the command is sent. A given work-queue can only be associated with a single type of command.

IMAGE

Define Command

Add the following command class to the Examples.RabbitMQ.Domain/Commands directory:

using NetFusion.Messaging.Types;

namespace Examples.RabbitMQ.Domain.Commands;

public class SendEmail : Command
{
    public string Subject { get; }
    public string FromAddress { get; }
    public string[] ToAddresses { get; }
    public string Message { get; }

    public SendEmail(string subject, string fromAddress, string[] toAddresses, string message)
    {
        Subject = subject;
        FromAddress = fromAddress;
        ToAddresses = toAddresses;
        Message = message;
    }
}

Subscribing Microservice

The subscribing microservice defines queue and a route specifying the method to be called when a command is sent to the queue. Add the following handler class to the Examples.RabbitMQ.App/Handlers directory:

using System;
using Examples.RabbitMQ.Domain.Commands;
using NetFusion.Common.Extensions;

namespace Examples.RabbitMQ.App.Handlers;

public class EmailHandler
{
    public void OnSendEmail(SendEmail command)
    {
        Console.WriteLine(nameof(OnSendEmail));
        Console.WriteLine(command.ToIndentedJson());
    }
}

Define the following route to dispatch the received command to the above handler method when received on the associated queue:

protected override void OnDefineEntities()
{   
    DefineQueue<SendEmail>(route =>
    {
        route.ToConsumer<EmailHandler>(c => c.OnSendEmail, queue =>
        {
            queue.QueueName = "ProcessEmails";
        });
    });
}

The above routing completes the following:

  • Define a queue named ProcessEmails.
  • When a command is received on the ProcessEmails queue, it will be dispatched to the OnSendEmail method of the EmailHandler class.

Publishing Microservice

Add the following code to the RabbitMqBusRouter OnDefineEntities method to specify, when a SendEmail command is sent, that it should be delivered to the ProcessEmails queue:

protected override void OnDefineEntities()
{
    RouteToQueue<SendEmail>("ProcessEmails");
}

Define Api Model

Add a corresponding model to the following directory: Examples.RabbitMQ.WebApi/Models:

using System.ComponentModel.DataAnnotations;

namespace Examples.RabbitMQ.WebApi.Models;

public class SendEmailModel
{
    [Required] public string Subject { get; set; } = string.Empty;
    [Required] public string FromAddress { get; set; } = string.Empty;
    [Required] public string[] ToAddresses { get; set; } = Array.Empty<string>();
    [Required] public string Message { get; set; } = string.Empty;
}

Define Api Controller

Add the following controller to the Examples.RabbitMQ.WebApi/Controllers directory:

namespace Examples.RabbitMQ.WebApi.Controllers;

[ApiController, Route("api/[controller]")]
public class ExampleController : ControllerBase
{
    private readonly IMessagingService _messaging;

    public ExampleController(
        IMessagingService messaging)
    {
        _messaging = messaging;
    }
  
    [HttpPost("send/email")]
    public async Task<IActionResult> SendEmail(SendEmailModel model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var command = new SendEmail(model.Subject, model.FromAddress, model.ToAddresses, model.Message);

        await _messaging.SendAsync(command);
        return Ok();
    }
}

Execute Example

Complete the following to run the example microservice and send a HTTP Post request to the example controller:

Execute Microservice

cd ./Examples.RabbitMQ/src/Examples.RabbitMQ.WebApi/
dotnet run

Send Requests

Post the following requests to: http://localhost:5000/api/example/send/email

{
    "subject": "Out of Coffee",
    "fromAddress": "john.smith@gmail.com",
    "toAddresses": ["amy.white@gmail.com", "mark.smith@yahoo.com"],
    "message": "More is needed ASAP!"
}

IMAGE

IMAGE

RabbitMQ Configurations

IMAGE

Notes

  • If a command is published for which there are no connected clients the message will be stored in the queue.
  • The command message is sent to the queue with the name matching the message's route-key value.
  • Command messages will be delivered round-robin to executing clients.
  • Work-queues are defined on the default exchange.
  • When the publisher sends a command, the command message's route-key is automatically set.
Clone this wiki locally