Skip to content

Commit

Permalink
Add Default Message Mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon committed Dec 2, 2024
1 parent 294e38b commit cb9fc8d
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public IBrighterBuilder MapperRegistry(Action<ServiceCollectionMessageMapperRegi
if (registerMappers == null) throw new ArgumentNullException(nameof(registerMappers));

registerMappers(_mapperRegistry);
_mapperRegistry.EnsureDefaultMessageMapperIsRegistered();

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ public static MessageMapperRegistry MessageMapperRegistry(IServiceProvider provi

var messageMapperRegistry = new MessageMapperRegistry(
new ServiceProviderMapperFactory(provider),
new ServiceProviderMapperFactoryAsync(provider)
new ServiceProviderMapperFactoryAsync(provider),
serviceCollectionMessageMapperRegistry.DefaultMessageMapper,
serviceCollectionMessageMapperRegistry.DefaultMessageMapperAsync
);

foreach (var messageMapper in serviceCollectionMessageMapperRegistry.Mappers)
Expand All @@ -391,6 +393,7 @@ public static MessageMapperRegistry MessageMapperRegistry(IServiceProvider provi
{
messageMapperRegistry.RegisterAsync(messageMapper.Key, messageMapper.Value);
}


return messageMapperRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,23 @@ THE SOFTWARE. */
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Paramore.Brighter.MessageMappers;

namespace Paramore.Brighter.Extensions.DependencyInjection
{
/// <summary>
/// When parsing for message mappers in assemblies, stores any found message mappers. A later step will add these to the message mapper registry
/// Not used directly
/// </summary>
public class ServiceCollectionMessageMapperRegistry
public class ServiceCollectionMessageMapperRegistry(
IServiceCollection serviceCollection,
ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
private readonly IServiceCollection _serviceCollection;
private readonly ServiceLifetime _lifetime;

public Dictionary<Type, Type> Mappers { get; } = new Dictionary<Type, Type>();
public Type DefaultMessageMapper { get; private set; } = typeof(JsonMessageMapper<>);
public Dictionary<Type, Type> AsyncMappers { get; } = new Dictionary<Type, Type>();

public ServiceCollectionMessageMapperRegistry(IServiceCollection serviceCollection, ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
_serviceCollection = serviceCollection;
_lifetime = lifetime;
}

public Type DefaultMessageMapperAsync { get; private set; } = typeof(JsonMessageMapper<>);

/// <summary>
/// Register a mapper with the collection (generic version)
/// </summary>
Expand Down Expand Up @@ -76,7 +72,7 @@ public void RegisterAsync<TRequest, TMessageMapper>() where TRequest : class, IR
/// <param name="mapper">The type of the mapper</param>
public void Add(Type message, Type mapper)
{
_serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, _lifetime));
serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, lifetime));
Mappers.Add(message, mapper);
}

Expand All @@ -87,8 +83,40 @@ public void Add(Type message, Type mapper)
/// <param name="mapper">The type of the mapper</param>
public void AddAsync(Type message, Type mapper)
{
_serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, _lifetime));
serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, lifetime));
AsyncMappers.Add(message, mapper);
}

/// <summary>
/// Set the Default Message Mapper
/// </summary>
/// <param name="defaultMapper">Type of default Message Mapper</param>
public void SetDefaultMessageMapper(Type defaultMapper)
{
serviceCollection.TryAdd(new ServiceDescriptor(defaultMapper, defaultMapper, lifetime));
DefaultMessageMapper = defaultMapper;
}

/// <summary>
/// Set the Default Async Message Mapper
/// </summary>
/// <param name="defaultMapper">Type of default async message mapper</param>
public void SetDefaultMessageMapperAsync(Type defaultMapper)
{
serviceCollection.TryAdd(new ServiceDescriptor(defaultMapper, defaultMapper, lifetime));
DefaultMessageMapperAsync = defaultMapper;
}

/// <summary>
/// Ensure that the default mappers are registered with Dependency Injection
/// </summary>
public void EnsureDefaultMessageMapperIsRegistered()
{
if(DefaultMessageMapper != null)
serviceCollection.TryAdd(new ServiceDescriptor(DefaultMessageMapper, DefaultMessageMapper, lifetime));
if (DefaultMessageMapperAsync != null)
serviceCollection.TryAdd(new ServiceDescriptor(DefaultMessageMapperAsync, DefaultMessageMapperAsync,
lifetime));
}
}
}
47 changes: 29 additions & 18 deletions src/Paramore.Brighter/MessageMapperRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using Paramore.Brighter.MessageMappers;

namespace Paramore.Brighter
{
Expand All @@ -40,16 +41,24 @@ public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMappe
private readonly IAmAMessageMapperFactoryAsync? _messageMapperFactoryAsync;
private readonly Dictionary<Type, Type> _messageMappers = new Dictionary<Type, Type>();
private readonly Dictionary<Type, Type> _asyncMessageMappers = new Dictionary<Type, Type>();
private readonly Type? _defaultMessageMapper;
private readonly Type? _defaultMessageMapperAsync;

/// <summary>
/// Initializes a new instance of the <see cref="MessageMapperRegistry"/> class.
/// </summary>
/// <param name="messageMapperFactory">The message mapper factory.</param>
/// <param name="messageMapperFactoryAsync">The async message mapper factory</param>
public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, IAmAMessageMapperFactoryAsync? messageMapperFactoryAsync)
/// <param name="defaultMessageMapper">The default message Mapper</param>
/// <param name="defaultMessageMapperAsync">The default Async Message Mapper</param>
public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory,
IAmAMessageMapperFactoryAsync? messageMapperFactoryAsync, Type? defaultMessageMapper = null,
Type? defaultMessageMapperAsync = null)
{
_messageMapperFactory = messageMapperFactory;
_messageMapperFactoryAsync = messageMapperFactoryAsync;
_defaultMessageMapper = defaultMessageMapper;
_defaultMessageMapperAsync = defaultMessageMapperAsync;

if (messageMapperFactory == null && messageMapperFactoryAsync == null)
throw new ConfigurationException("Should have at least one factory");
Expand All @@ -62,33 +71,35 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, IAm
/// <returns>IAmAMessageMapper&lt;TRequest&gt;.</returns>
public IAmAMessageMapper<TRequest>? Get<TRequest>() where TRequest : class, IRequest
{
if ( _messageMapperFactory is not null && _messageMappers.ContainsKey(typeof(TRequest)))
{
var messageMapperType = _messageMappers[typeof(TRequest)];
return (IAmAMessageMapper<TRequest>)_messageMapperFactory.Create(messageMapperType);
}
else
{
if (_messageMapperFactory is null)
return null;
}

var messageMapperType = _messageMappers.ContainsKey(typeof(TRequest))
? _messageMappers[typeof(TRequest)]
: _defaultMessageMapper;

if (messageMapperType is null) return null;

return (IAmAMessageMapper<TRequest>)_messageMapperFactory.Create(messageMapperType);
}

/// <summary>
/// Gets this instance.
/// </summary>
/// <typeparam name="TRequest">The type of the t request.</typeparam>
/// <returns>IAmAMessageMapperAsync&lt;TRequest&gt;.</returns>
public IAmAMessageMapperAsync<TRequest>? GetAsync<TRequest>() where TRequest : class, IRequest
{
if (_messageMapperFactoryAsync is not null && _asyncMessageMappers.ContainsKey(typeof(TRequest)))
{
var messageMapperType = _asyncMessageMappers[typeof(TRequest)];
return (IAmAMessageMapperAsync<TRequest>)_messageMapperFactoryAsync.Create(messageMapperType);
}
else
{
if (_messageMapperFactoryAsync is null)
return null;
}

var messageMapperType = _asyncMessageMappers.ContainsKey(typeof(TRequest))
? _asyncMessageMappers[typeof(TRequest)]
: _defaultMessageMapperAsync;

if (messageMapperType is null) return null;

return (IAmAMessageMapperAsync<TRequest>)_messageMapperFactoryAsync.Create(messageMapperType);
}

/// <summary>
Expand Down
47 changes: 47 additions & 0 deletions src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Paramore.Brighter.MessageMappers;

public class JsonMessageMapper<TRequest>(RequestContext? context) : IAmAMessageMapper<TRequest>, IAmAMessageMapperAsync<TRequest> where TRequest : class, IRequest
{
public IRequestContext? Context { get; set; } = context;

public Task<Message> MapToMessageAsync(TRequest request, Publication publication,
CancellationToken cancellationToken = default)
=> Task.FromResult(MapToMessage(request, publication));

public Task<TRequest> MapToRequestAsync(Message message, CancellationToken cancellationToken = default)
=> Task.FromResult(MapToRequest(message));

public Message MapToMessage(TRequest request, Publication publication)
{
MessageType messageType = request switch
{
ICommand => MessageType.MT_COMMAND,
IEvent => MessageType.MT_EVENT,
_ => throw new ArgumentException(@"This message mapper can only map Commands and Events", nameof(request))
};

if(publication.Topic is null)
throw new ArgumentException($"No Topic Defined for {publication}");

var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, messageType: messageType);

var body = new MessageBody(JsonSerializer.Serialize(request, JsonSerialisationOptions.Options));
var message = new Message(header, body);
return message;
}

public TRequest MapToRequest(Message message)
{
var request = JsonSerializer.Deserialize<TRequest>(message.Body.Value, JsonSerialisationOptions.Options);

if (request is null)
throw new ArgumentException($"Unable to deseralise message body for {message.Header.Topic}");

return request;
}
}

0 comments on commit cb9fc8d

Please sign in to comment.