Skip to content

Commit

Permalink
feat(server-abstractions,persistence,protocols,server): ensure queues…
Browse files Browse the repository at this point in the history
… not on every publish
  • Loading branch information
thomashilzendegen authored and gingters committed Jul 2, 2024
1 parent 779fb29 commit 50d51fd
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,18 @@ public interface ITenantService
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is
/// <see cref="P:System.Threading.CancellationToken.None"/>.
///</param>
/// <returns>A <see cref="Page{Tenant}"/> representing the asynchronous loading of a page of <see cref="Tenant"/>s.</returns>
/// <returns>A <see cref="Task"/> representing the asynchronous operation, which wraps the <see cref="Page{Tenant}"/>.</returns>
Task<Page<Tenant>> LoadAllTenantsPagedAsync(int skip, int take, CancellationToken cancellationToken = default);

/// <summary>
/// Loads all tenant names.
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is
/// <see cref="P:System.Threading.CancellationToken.None"/>.
///</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation, which wraps the list of tenant names.</returns>
Task<string[]> LoadAllTenantNamesAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Creates a new <see cref="Tenant"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public async Task<Page<Tenant>> LoadAllTenantsPagedAsync(int skip, int take, Can
.OrderBy(t => t.NormalizedName)
.ToPagedResultAsync(skip, take, cancellationToken);

/// <inheritdoc />
public Task<string[]> LoadAllTenantNamesAsync(CancellationToken cancellationToken = default)
=> _dbContext.Tenants.Select(t => t.Name).ToArrayAsync(cancellationToken);

/// <inheritdoc />
public async Task CreateTenantAsync(Tenant tenant, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal partial class DisposableConsumer : IDisposable
/// <param name="durable">The queue should survive a broker restart.</param>
/// <param name="autoDelete">The queue should be deleted when the last consumer goes away.</param>
public DisposableConsumer(ILogger logger, IModel model, string queueName, bool autoAck = true, bool durable = true,
bool autoDelete = true)
bool autoDelete = false)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

Expand All @@ -49,7 +49,7 @@ public void Consume(Func<BasicDeliverEventArgs, Task> handler)
_consumer.Received += ConsumerReceivedAsync;
_consumer.ConsumerCancelled += ConsumerCancelledAsync;

_consumer.Model.EnsureQueue(_queueName, _durable, _autoDelete);
_consumer.Model.EnsureQueue(_queueName, durable: _durable, autoDelete: _autoDelete);
_consumerTag = _consumer.Model.BasicConsume(_queueName, _autoAck, _consumer);

Log.ConsumingConsumer(_logger, _queueName, _consumerTag);
Expand All @@ -66,7 +66,7 @@ private Task ConsumerCancelledAsync(object sender, ConsumerEventArgs @event)

lock (_consumer.Model)
{
_consumer.Model.EnsureQueue(_queueName, _durable, _autoDelete);
_consumer.Model.EnsureQueue(_queueName, durable: _durable, autoDelete: _autoDelete);
var consumerTag = _consumer.Model.BasicConsume(_queueName, _autoAck, _consumer);
Log.RestoredConsumer(_logger, consumerTag, _queueName, _consumerTag);
_consumerTag = consumerTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ internal static class LoggingEventIds

public const int TenantTransportPublishedRequest = 10401;
public const int TenantTransportErrorDispatchingRequest = 10402;
public const int TenantTransportLoadedTenantNames = 10403;
public const int TenantTransportEnsuredTenantQueue = 10404;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,32 @@ internal static class ModelExtensions
/// <param name="queueName">The name of the queue.</param>
/// <param name="durable">The queue should survive a broker restart.</param>
/// <param name="autoDelete">The queue should be deleted when the last consumer goes away.</param>
public static void EnsureQueue(this IModel model, string queueName, bool durable = true, bool autoDelete = true)
public static void EnsureQueue(this IModel model, string queueName, bool durable = true, bool autoDelete = false)
{
model.ExchangeDeclare(Constants.ExchangeName, ExchangeType.Direct);
model.QueueDeclare(queueName, autoDelete: autoDelete, durable: durable, exclusive: false);
model.QueueDeclare(queueName, durable: durable, exclusive: false, autoDelete: autoDelete);
model.QueueBind(queueName, Constants.ExchangeName, queueName);
}

/// <summary>
/// Convenience method to publish a payload as JSON to a queue. Ensures the existence of the queue when
/// <paramref name="persistent"/> is set to true.
/// Convenience method to publish a payload as JSON to a queue.
/// </summary>
/// <param name="model">The <see cref="IModel"/> used to communicate with Rabbit MQ.</param>
/// <param name="queueName">The name of the queue.</param>
/// <param name="payload">The payload to serialize as JSON and publish to the queue.</param>
/// <param name="persistent">The publication should survive a broker restart (when the queue supports it).</param>
/// <param name="durable">The queue should survive a broker restart.</param>
/// <param name="autoDelete">The queue should be deleted when the last consumer goes away.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public static Task PublishJsonAsync(this IModel model, string queueName, object payload, bool persistent = true,
bool durable = true, bool autoDelete = true)
public static Task PublishJsonAsync(this IModel model, string queueName, object payload, bool persistent = false)
{
var properties = model.CreateBasicProperties();
properties.Persistent = persistent;
properties.ContentType = "application/json";

var body = JsonSerializer.SerializeToUtf8Bytes(payload);

lock (model)
{
if (persistent)
{
model.EnsureQueue(queueName, durable, autoDelete);
}

model.BasicPublish(Constants.ExchangeName, queueName, properties,
JsonSerializer.SerializeToUtf8Bytes(payload));
model.BasicPublish(Constants.ExchangeName, queueName, properties, body);
}

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ public static class RelayServerBuilderExtensions

if (useTenantRouting)
{
builder.Services.AddSingleton<ITenantTransport<TRequest>, TenantTransport<TRequest, TAcknowledge>>();
builder.Services.AddSingleton<TenantTransport<TRequest, TAcknowledge>>();
builder.Services.AddHostedService(provider
=> provider.GetRequiredService<TenantTransport<TRequest, TAcknowledge>>());

builder.Services.AddSingleton<ITenantTransport<TRequest>>(provider
=> provider.GetRequiredService<TenantTransport<TRequest, TAcknowledge>>());

builder.Services.AddSingleton<ITenantHandlerFactory, TenantHandlerFactory<TRequest, TAcknowledge>>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void Dispose()
public async Task DispatchResponseAsync(TResponse response)
{
await _responseDispatchModel.PublishJsonAsync($"{Constants.ResponseQueuePrefix} {response.RequestOriginId}",
response, durable: false, persistent: false);
response);
Log.DispatchedResponse(_logger, response.RequestId, response.RequestOriginId);
}

Expand All @@ -93,7 +93,7 @@ public async Task DispatchAcknowledgeAsync(TAcknowledge request)
Log.DispatchingAcknowledge(_logger, request);

await _acknowledgeDispatchModel.PublishJsonAsync($"{Constants.AcknowledgeQueuePrefix} {request.OriginId}",
request, durable: false, persistent: false);
request);
Log.DispatchedAcknowledge(_logger, request.RequestId, request.OriginId);
}

Expand All @@ -114,21 +114,19 @@ private async Task AcknowledgeConsumerReceivedAsync(BasicDeliverEventArgs @event
await _acknowledgeCoordinator.ProcessAcknowledgeAsync(request);
}

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken)
Task IHostedService.StartAsync(CancellationToken cancellationToken)
{
_responseConsumer = new DisposableConsumer(_logger, _responseConsumeModel,
$"{Constants.ResponseQueuePrefix} {_originId}");
$"{Constants.ResponseQueuePrefix} {_originId}", autoDelete: true);
_responseConsumer.Consume(ResponseConsumerReceivedAsync);

_acknowledgeConsumer = new DisposableConsumer(_logger, _acknowledgeConsumeModel,
$"{Constants.AcknowledgeQueuePrefix} {_originId}");
$"{Constants.AcknowledgeQueuePrefix} {_originId}", autoDelete: true);
_acknowledgeConsumer.Consume(AcknowledgeConsumerReceivedAsync);

return Task.CompletedTask;
}

/// <inheritdoc />
public Task StopAsync(CancellationToken cancellationToken)
Task IHostedService.StopAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public TenantHandler(ILogger<TenantHandler<TRequest, TAcknowledge>> logger, stri
}

_consumer = new DisposableConsumer(_logger, _model, $"{Constants.RequestQueuePrefix} {tenantName}",
autoDelete: false, autoAck: false);
autoAck: false);
_consumer.Consume(ConsumerReceivedAsync);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,13 @@ private static partial class Log
"An error occured while dispatching request {RelayRequestId} to tenant {TenantName} queue")]
public static partial void ErrorDispatchingRequest(ILogger logger, Exception ex, Guid relayRequestId,
string tenantName);

[LoggerMessage(LoggingEventIds.TenantTransportLoadedTenantNames, LogLevel.Information,
"Loaded {TenantCount} tenant names")]
public static partial void LoadedTenantNames(ILogger logger, int tenantCount);

[LoggerMessage(LoggingEventIds.TenantTransportEnsuredTenantQueue, LogLevel.Trace,
"Ensured queue {QueueName} for tenant {TenantName}")]
public static partial void EnsuredTenantQueue(ILogger logger, string queueName, string tenantName);
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Thinktecture.Relay.Acknowledgement;
using Thinktecture.Relay.Server.Persistence;
using Thinktecture.Relay.Server.Transport;
using Thinktecture.Relay.Transport;

namespace Thinktecture.Relay.Server.Protocols.RabbitMq;

/// <inheritdoc cref="ITenantTransport{T}"/>
public partial class TenantTransport<TRequest, TAcknowledge> : ITenantTransport<TRequest>, IDisposable
public partial class TenantTransport<TRequest, TAcknowledge> : ITenantTransport<TRequest>, IHostedService, IDisposable
where TRequest : IClientRequest
where TAcknowledge : IAcknowledgeRequest
{
private readonly ILogger _logger;
private readonly IModel _model;
private readonly IServiceProvider _serviceProvider;

/// <inheritdoc />
public int? BinarySizeThreshold { get; }
Expand All @@ -25,15 +30,17 @@ public partial class TenantTransport<TRequest, TAcknowledge> : ITenantTransport<
/// Initializes a new instance of the <see cref="TenantTransport{TRequest,TAcknowledge}"/> class.
/// </summary>
/// <param name="logger">An <see cref="ILogger{TCatgeory}"/>.</param>
/// <param name="serviceProvider">An <see cref="IServiceProvider"/>.</param>
/// <param name="modelFactory">The <see cref="ModelFactory{TAcknowledge}"/>.</param>
/// <param name="rabbitMqOptions">An <see cref="IOptions{TOptions}"/>.</param>
public TenantTransport(ILogger<TenantTransport<TRequest, TAcknowledge>> logger, ModelFactory<TAcknowledge> modelFactory,
IOptions<RabbitMqOptions> rabbitMqOptions)
public TenantTransport(ILogger<TenantTransport<TRequest, TAcknowledge>> logger, IServiceProvider serviceProvider,
ModelFactory<TAcknowledge> modelFactory, IOptions<RabbitMqOptions> rabbitMqOptions)
{
if (modelFactory is null) throw new ArgumentNullException(nameof(modelFactory));
if (rabbitMqOptions is null) throw new ArgumentNullException(nameof(rabbitMqOptions));

_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_model = modelFactory.Create("tenant dispatcher");

BinarySizeThreshold = rabbitMqOptions.Value.MaximumBinarySize;
Expand All @@ -44,7 +51,7 @@ public async Task TransportAsync(TRequest request)
{
try
{
await _model.PublishJsonAsync($"{Constants.RequestQueuePrefix} {request.TenantName}", request, autoDelete: false);
await _model.PublishJsonAsync($"{Constants.RequestQueuePrefix} {request.TenantName}", request);
Log.PublishedRequest(_logger, request.RequestId, request.TenantName);
}
catch (RabbitMQClientException ex)
Expand All @@ -57,4 +64,23 @@ public async Task TransportAsync(TRequest request)
/// <inheritdoc />
public void Dispose()
=> _model.Dispose();

async Task IHostedService.StartAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var tenantService = scope.ServiceProvider.GetRequiredService<ITenantService>();

var tenantNames = await tenantService.LoadAllTenantNamesAsync(cancellationToken);
Log.LoadedTenantNames(_logger, tenantNames.Length);

foreach (var tenantName in tenantNames)
{
var queueName = $"{Constants.RequestQueuePrefix} {tenantName}";
_model.EnsureQueue(queueName);
Log.EnsuredTenantQueue(_logger, queueName, tenantName);
}
}

Task IHostedService.StopAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Thinktecture.Relay.Transport;
Expand All @@ -19,6 +20,9 @@ public InMemoryTenantTransport(ILogger<InMemoryTenantTransport<T>> logger, Conne
_connectorRegistry = connectorRegistry ?? throw new ArgumentNullException(nameof(connectorRegistry));
}

public Task EnsureKnownTransports(CancellationToken cancellationToken = default)
=> Task.CompletedTask;

public async Task TransportAsync(T request)
{
if (!await _connectorRegistry.TryDeliverRequestAsync(request))
Expand Down

0 comments on commit 50d51fd

Please sign in to comment.