Библиотека для работы с RabbitMq. Главные сделанные выборы:
- Получение и публикация сообщений происходит через собственные объекты RabbitMqMessage и ReceivedRabbitMqMessage
- Контент передаётся через
HttpContent
- Очередь ошибок есть всегда, в случае исключения сохраняется текст исключения
- Встроенная поддержка трассировки
- Встроенная поддержка метрик через активити сурц
- Очереди нужно явно декларировать, для поддержки миграторов
IRabbtiMqСlient вдохновлён HttpClient'ом.
- Опустошать очередь
- Получать количество сообщений в очереди
- Получать сообщения из очереди по одному
- Завершать обработку полученного сообщения одним из 4-х возможных исходов
- Ack
- NackWithRequeue
- NackWithoutRequeue
- Error
- Подписываться на сообщения очереди и возвращать один из 4-х возможных результатов
- Ack
- NackWithRequeue
- NackWithoutRequeue
- Error
- Публиковать сообщения в очередь
- Создание очереди
- Проверка существования очереди
- Удаление очереди
- Создание обменника
- Проверка существования обменника
- Удаление обменника
- Связывание очереди и обменника
Библиотекой можно пользоваться "как есть", но лучше использовать отдельные клиенты для каждого сервиса с которым происходит интеграция.
Клиент — инкапсулирует в себе:
- DTO
- название обменников и routing
- логику сериализации/десериализации DTO
(+) Я как создатель сервиса, который владеет контрактом очереди, должен предоставлять клиента для отправки ему сообщений, и подписки на события от него
(-) Я как пользователь клиента могу изменить исходящее сообщение
(+) Я как пользователь клиента могу задавать очередь, через которую получаю события, и её настройки
(+) Я как пользователь клиента могу задать свою стратегию обработки полученных сообщений
(+) Я как создатель клиента, должен понимать, что должно быть в клиенте, а чего не должно быть (название обменников, ключей, дто, формат)
(+) Я как пользователь библиотеки, могу написать клиента для тех случаев, когда его нет по какой либо причине.
(-) Я как пользователь библиотеки, при подписке настраиваю все необходимые для обработки сообщения очереди. Библиотека автоматически создаст всё настроенное при появлении коннекта и пересоздаст при его восстановлении после обрыва
(-) Я как автор клиента имею готовую инфраструктуру для реализации RPC
Create queue in hosted service:
public class QueueHostedService : IHostedService
{
private readonly IRabbitMqClient _rabbitMqClient;
public QueueHostedService(
IRabbitMqClient rabbitMqClient)
{
_rabbitMqClient = rabbitMqClient;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _rabbitMqClient.CreateQueueAsync("app.sample_dto", o => o.AsDurable(true), cancellationToken: cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Send message:
public async Task SendMessageAsync(int id)
{
var sampleDto = new SampleDto
{
Id = id,
Name = "name"
};
await _rabbitMqClient.PublishAsJsonAsync(exchangeName: null,
routingKey: "app.sample_dto",
model: sampleDto);
}
Register handler in hosted service:
public class QueueHostedService : BackgroundService
{
private readonly IRabbitMqClient _rabbitMqClient;
private readonly SampleDtoHandler _sampleDtoHandler;
public QueueHostedService(
IRabbitMqClient rabbitMqClient,
SampleDtoHandler sampleDtoHandler)
{
_rabbitMqClient = rabbitMqClient;
_sampleDtoHandler = sampleDtoHandler;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _rabbitMqClient.SubscribeAsJson<SampleDto>("app.sample_dto", OnMessage)
.WithDeclareErrorQueue(o => o.AsDurable(true))
.WithConstantTimeoutRetryStrategy(TimeSpan.FromSeconds(5), 5, o => o.AsDurable(true))
.StartAsync(stoppingToken);
}
private async Task<ConsumeResult> OnMessage(SampleDto? dto, CancellationToken cancellationToken)
{
if (dto == null)
throw new InvalidOperationException("content is null");
await _sampleDtoHandler.HandleAsync(dto);
return ConsumeResult.Ack;
}
}
Abstractions for Byndyusoft.Messaging.RabbitMq.
dotnet add package Byndyusoft.Messaging.RabbitMq.Abstractions
Core implementation for Byndyusoft.Messaging.RabbitMq.
dotnet add package Byndyusoft.Messaging.RabbitMq.Core
EasyNetQ implementation for Byndyusoft.Messaging.RabbitMq.
dotnet add package Byndyusoft.Messaging.RabbitMq
OpenTelemetry support for Byndyusoft.Messaging.RabbitMq.
dotnet add package Byndyusoft.Messaging.RabbitMq.OpenTelemetry
To contribute, you will need to setup your local environment, see prerequisites. For the contribution and workflow guide, see package development lifecycle.
A detailed overview on how to contribute can be found in the contributing guide.
Make sure you have installed all of the following prerequisites on your development machine:
- Git - Download & Install Git. OSX and Linux machines typically have this already installed.
- .NET Core (version 6.0 or higher) - Download & Install .NET Core.
- RabbitMq Download & Install.
- source code
- unit-tests
- example console application
- Implement package logic in
src
- Add or addapt unit-tests (prefer before and simultaneously with coding) in
tests
- Add or change the documentation as needed
- Open pull request in the correct branch. Target the project's
master
branch