From 8871301d53edfaf3ec81290a2eeadfcef37e7b98 Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Fri, 30 Apr 2021 10:03:47 +0100 Subject: [PATCH] feat!: change the serializers to use async and streams - The IMessageSerializer was renamed to ISerializer - The method Serialize was changed to SerializeAsync, to return a Task and writing the result into a Stream - The method Deserialize was changed to DeserializeAsync, to return a Task and read the data from a Stream - All the existing serializer was changed to use the new interface --- .../IMessageSerializer.cs | 25 -------- src/KafkaFlow.Abstractions/ISerializer.cs | 30 ++++++++++ .../ISerializerContext.cs | 9 +++ .../SerializerContext.cs | 15 +++++ .../ConfluentDeserializerWrapper.cs | 25 +++++--- .../ConfluentSerializerWrapper.cs | 18 ++++-- .../KafkaFlow.SchemaRegistry.csproj | 1 + .../JsonCoreSerializer.cs | 27 +++++---- .../NewtonsoftJsonSerializer.cs | 30 +++++----- .../ProtobufNetSerializer.cs | 25 ++++---- .../ConfluentAvroSerializer.cs | 12 ++-- .../ConfluentJsonSerializer.cs | 12 ++-- .../ConfluentProtobufSerializer.cs | 12 ++-- .../ConfigurationBuilderExtensions.cs | 60 +++++++++---------- .../KafkaFlow.Serializer.csproj | 6 +- .../SerializerConsumerMiddleware.cs | 25 +++++--- .../SerializerProducerMiddleware.cs | 27 +++++++-- .../SerializerConsumerMiddlewareTests.cs | 21 ++++--- .../SerializerProducerMiddlewareTests.cs | 22 +++++-- 19 files changed, 250 insertions(+), 152 deletions(-) delete mode 100644 src/KafkaFlow.Abstractions/IMessageSerializer.cs create mode 100644 src/KafkaFlow.Abstractions/ISerializer.cs create mode 100644 src/KafkaFlow.Abstractions/ISerializerContext.cs create mode 100644 src/KafkaFlow.Abstractions/SerializerContext.cs diff --git a/src/KafkaFlow.Abstractions/IMessageSerializer.cs b/src/KafkaFlow.Abstractions/IMessageSerializer.cs deleted file mode 100644 index 4a8083373..000000000 --- a/src/KafkaFlow.Abstractions/IMessageSerializer.cs +++ /dev/null @@ -1,25 +0,0 @@ -namespace KafkaFlow -{ - using System; - - /// - /// Used to implement a message serializer - /// - public interface IMessageSerializer - { - /// - /// Serializes the given message - /// - /// The message to be serialized - /// The serialized message - byte[] Serialize(object message); - - /// - /// Deserializes the given message - /// - /// The message to be deserialized - /// The type to be created - /// The deserialized message - object Deserialize(byte[] message, Type type); - } -} diff --git a/src/KafkaFlow.Abstractions/ISerializer.cs b/src/KafkaFlow.Abstractions/ISerializer.cs new file mode 100644 index 000000000..0b0951997 --- /dev/null +++ b/src/KafkaFlow.Abstractions/ISerializer.cs @@ -0,0 +1,30 @@ +namespace KafkaFlow +{ + using System; + using System.IO; + using System.Threading.Tasks; + + /// + /// Used to implement a message serializer + /// + public interface ISerializer + { + /// + /// Serializes the given message + /// + /// The message to be serialized + /// A stream to write the serialized data + /// An object containing metadata + /// The serialized message + Task SerializeAsync(object message, Stream output, ISerializerContext context); + + /// + /// Deserializes the given message + /// + /// A stream to read the data to be deserialized + /// The type to be created + /// An object containing metadata + /// The deserialized message + Task DeserializeAsync(Stream input, Type type, ISerializerContext context); + } +} diff --git a/src/KafkaFlow.Abstractions/ISerializerContext.cs b/src/KafkaFlow.Abstractions/ISerializerContext.cs new file mode 100644 index 000000000..661854213 --- /dev/null +++ b/src/KafkaFlow.Abstractions/ISerializerContext.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow +{ + /// + /// A context that can have some metadata to help with serialization process + /// + public interface ISerializerContext + { + } +} diff --git a/src/KafkaFlow.Abstractions/SerializerContext.cs b/src/KafkaFlow.Abstractions/SerializerContext.cs new file mode 100644 index 000000000..cd8cf7c50 --- /dev/null +++ b/src/KafkaFlow.Abstractions/SerializerContext.cs @@ -0,0 +1,15 @@ +namespace KafkaFlow +{ + /// + public class SerializerContext : ISerializerContext + { + /// + /// The static instance of the + /// + public static readonly ISerializerContext Empty = new SerializerContext(); + + private SerializerContext() + { + } + } +} diff --git a/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs b/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs index 2bb4f52b2..b3d7c0f72 100644 --- a/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs +++ b/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs @@ -2,13 +2,18 @@ namespace KafkaFlow { using System; using System.Collections.Concurrent; + using System.IO; + using System.Threading.Tasks; using Confluent.Kafka; + using Microsoft.IO; /// /// A wrapper to call the typed Confluent deserializers /// public abstract class ConfluentDeserializerWrapper { + private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new(); + private static readonly ConcurrentDictionary Deserializers = new(); /// @@ -31,9 +36,9 @@ public static ConfluentDeserializerWrapper GetOrCreateDeserializer( /// /// Deserialize a message using the passed deserializer /// - /// The message to deserialize + /// The message stream to deserialize /// - public abstract object Deserialize(byte[] message); + public abstract Task DeserializeAsync(Stream input); private class InnerConfluentDeserializerWrapper : ConfluentDeserializerWrapper { @@ -44,12 +49,18 @@ public InnerConfluentDeserializerWrapper(Func deserializerFactory) this.deserializer = (IAsyncDeserializer) deserializerFactory(); } - public override object Deserialize(byte[] message) + public override async Task DeserializeAsync(Stream input) { - return this.deserializer - .DeserializeAsync(message, message == null, SerializationContext.Empty) - .GetAwaiter() - .GetResult(); + using var buffer = MemoryStreamManager.GetStream(); + + await input.CopyToAsync(buffer).ConfigureAwait(false); + + return await this.deserializer + .DeserializeAsync( + new ReadOnlyMemory(buffer.GetBuffer(), 0, (int) buffer.Length), + false, + Confluent.Kafka.SerializationContext.Empty) + .ConfigureAwait(false); } } } diff --git a/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs b/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs index 7b529b820..c2f9bfba6 100644 --- a/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs +++ b/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs @@ -2,6 +2,8 @@ namespace KafkaFlow { using System; using System.Collections.Concurrent; + using System.IO; + using System.Threading.Tasks; using Confluent.Kafka; /// @@ -32,8 +34,9 @@ public static ConfluentSerializerWrapper GetOrCreateSerializer( /// Serialize a message using the passed serializer /// /// The message to serialize + /// Where the serialization result will be stored /// - public abstract byte[] Serialize(object message); + public abstract Task SerializeAsync(object message, Stream output); private class InnerConfluentSerializerWrapper : ConfluentSerializerWrapper { @@ -44,12 +47,15 @@ public InnerConfluentSerializerWrapper(Func serializerFactory) this.serializer = (IAsyncSerializer) serializerFactory(); } - public override byte[] Serialize(object message) + public override async Task SerializeAsync(object message, Stream output) { - return this.serializer - .SerializeAsync((T) message, SerializationContext.Empty) - .GetAwaiter() - .GetResult(); + var data = await this.serializer + .SerializeAsync((T) message, Confluent.Kafka.SerializationContext.Empty) + .ConfigureAwait(false); + + await output + .WriteAsync(data, 0, data.Length) + .ConfigureAwait(false); } } } diff --git a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj index 54b9b5d04..2a0a2701d 100644 --- a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj +++ b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj @@ -9,6 +9,7 @@ + diff --git a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs index f88d26aed..b29a082b1 100644 --- a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs +++ b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs @@ -1,12 +1,14 @@ namespace KafkaFlow.Serializer { using System; + using System.IO; using System.Text.Json; + using System.Threading.Tasks; /// /// A message serializer using System.Text.Json library /// - public class JsonCoreSerializer : IMessageSerializer + public class JsonCoreSerializer : ISerializer { private readonly JsonSerializerOptions options; @@ -27,21 +29,22 @@ public JsonCoreSerializer() { } - /// Serializes the message - /// The message to be serialized - /// A UTF8 JSON string - public byte[] Serialize(object message) + /// + public Task SerializeAsync(object message, Stream output, ISerializerContext context) { - return JsonSerializer.SerializeToUtf8Bytes(message, this.options); + using var writer = new Utf8JsonWriter(output); + + JsonSerializer.Serialize(writer, message, this.options); + + return Task.CompletedTask; } - /// Deserialize the message - /// The message to be deserialized (cannot be null) - /// The destination type - /// An instance of the passed type - public object Deserialize(byte[] data, Type type) + /// + public async Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { - return JsonSerializer.Deserialize(data, type, this.options); + return await JsonSerializer + .DeserializeAsync(input, type, this.options) + .ConfigureAwait(false); } } } diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs index 1c45a9291..efcf9630d 100644 --- a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs @@ -1,13 +1,15 @@ namespace KafkaFlow.Serializer { using System; + using System.IO; using System.Text; + using System.Threading.Tasks; using Newtonsoft.Json; /// /// A message serializer using NewtonsoftJson library /// - public class NewtonsoftJsonSerializer : IMessageSerializer + public class NewtonsoftJsonSerializer : ISerializer { private readonly JsonSerializerSettings settings; @@ -28,24 +30,24 @@ public NewtonsoftJsonSerializer() { } - /// Serializes the message - /// The message to be serialized - /// A UTF8 JSON string - public byte[] Serialize(object message) + /// + public Task SerializeAsync(object message, Stream output, ISerializerContext context) { - var serialized = JsonConvert.SerializeObject(message, this.settings); - return Encoding.UTF8.GetBytes(serialized); + using var sw = new StreamWriter(output, Encoding.UTF8); + var serializer = JsonSerializer.CreateDefault(this.settings); + + serializer.Serialize(sw, message); + + return Task.CompletedTask; } - /// Deserialize the message - /// The message to be deserialized (cannot be null) - /// The destination type - /// An instance of the passed type - public object Deserialize(byte[] data, Type type) + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { - var json = Encoding.UTF8.GetString(data); + using var sr = new StreamReader(input, Encoding.UTF8); + var serializer = JsonSerializer.CreateDefault(this.settings); - return JsonConvert.DeserializeObject(json, type, this.settings); + return Task.FromResult(serializer.Deserialize(sr, type)); } } } diff --git a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs index 424d1c7ec..50ff531a9 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs +++ b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs @@ -2,31 +2,26 @@ { using System; using System.IO; + using System.Threading.Tasks; using ProtoBuf; /// /// A message serializer using protobuf-net library /// - public class ProtobufNetSerializer : IMessageSerializer + public class ProtobufNetSerializer : ISerializer { - /// Serializes the message - /// The message to be serialized - /// The serialized message - public byte[] Serialize(object message) + /// + public Task SerializeAsync(object message, Stream output, ISerializerContext context) { - using var stream = new MemoryStream(); - Serializer.Serialize(stream, message); - return stream.ToArray(); + Serializer.Serialize(output, message); + + return Task.CompletedTask; } - /// Deserialize the message - /// The message to be deserialized - /// The destination type - /// The deserialized message - public object Deserialize(byte[] data, Type type) + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { - using var stream = new MemoryStream(data); - return Serializer.Deserialize(type, stream); + return Task.FromResult(Serializer.Deserialize(type, input)); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs index 383b7b6b5..5ef22c056 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs @@ -1,13 +1,15 @@ namespace KafkaFlow.Serializer.SchemaRegistry { using System; + using System.IO; + using System.Threading.Tasks; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; /// /// A message serializer using Apache.Avro library /// - public class ConfluentAvroSerializer : IMessageSerializer + public class ConfluentAvroSerializer : ISerializer { private readonly ISchemaRegistryClient schemaRegistryClient; private readonly AvroSerializerConfig serializerConfig; @@ -39,7 +41,7 @@ public ConfluentAvroSerializer( } /// - public byte[] Serialize(object message) + public Task SerializeAsync(object message, Stream output, ISerializerContext context) { return ConfluentSerializerWrapper .GetOrCreateSerializer( @@ -48,11 +50,11 @@ public byte[] Serialize(object message) typeof(AvroSerializer<>).MakeGenericType(message.GetType()), this.schemaRegistryClient, this.serializerConfig)) - .Serialize(message); + .SerializeAsync(message, output); } /// - public object Deserialize(byte[] data, Type type) + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { return ConfluentDeserializerWrapper .GetOrCreateDeserializer( @@ -62,7 +64,7 @@ public object Deserialize(byte[] data, Type type) typeof(AvroDeserializer<>).MakeGenericType(type), this.schemaRegistryClient, new AvroDeserializerConfig())) - .Deserialize(data); + .DeserializeAsync(input); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs index b19cb1ee4..f0ffdfe4e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs @@ -2,7 +2,9 @@ { using System; using System.Collections.Generic; + using System.IO; using System.Linq; + using System.Threading.Tasks; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; using NJsonSchema.Generation; @@ -10,7 +12,7 @@ /// /// A json message serializer integrated with the confluent schema registry /// - public class ConfluentJsonSerializer : IMessageSerializer + public class ConfluentJsonSerializer : ISerializer { private readonly ISchemaRegistryClient schemaRegistryClient; private readonly JsonSerializerConfig serializerConfig; @@ -56,7 +58,7 @@ public ConfluentJsonSerializer( } /// - public byte[] Serialize(object message) + public Task SerializeAsync(object message, Stream output, ISerializerContext context) { return ConfluentSerializerWrapper .GetOrCreateSerializer( @@ -66,11 +68,11 @@ public byte[] Serialize(object message) this.schemaRegistryClient, this.serializerConfig, this.schemaGeneratorSettings)) - .Serialize(message); + .SerializeAsync(message, output); } /// - public object Deserialize(byte[] message, Type type) + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { return ConfluentDeserializerWrapper .GetOrCreateDeserializer( @@ -80,7 +82,7 @@ public object Deserialize(byte[] message, Type type) typeof(JsonDeserializer<>).MakeGenericType(type), Enumerable.Empty>(), null)) - .Deserialize(message); + .DeserializeAsync(input); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs index 7f6f5b309..b58395f7e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs @@ -2,14 +2,16 @@ { using System; using System.Collections.Generic; + using System.IO; using System.Linq; + using System.Threading.Tasks; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; /// /// A protobuf message serializer integrated with the confluent schema registry /// - public class ConfluentProtobufSerializer : IMessageSerializer + public class ConfluentProtobufSerializer : ISerializer { private readonly ISchemaRegistryClient schemaRegistryClient; private readonly ProtobufSerializerConfig serializerConfig; @@ -39,7 +41,7 @@ public ConfluentProtobufSerializer(IDependencyResolver resolver, ProtobufSeriali } /// - public byte[] Serialize(object message) + public Task SerializeAsync(object message, Stream output, ISerializerContext context) { return ConfluentSerializerWrapper .GetOrCreateSerializer( @@ -48,11 +50,11 @@ public byte[] Serialize(object message) typeof(ProtobufSerializer<>).MakeGenericType(message.GetType()), this.schemaRegistryClient, this.serializerConfig)) - .Serialize(message); + .SerializeAsync(message, output); } /// - public object Deserialize(byte[] message, Type type) + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { return ConfluentDeserializerWrapper .GetOrCreateDeserializer( @@ -61,7 +63,7 @@ public object Deserialize(byte[] message, Type type) .CreateInstance( typeof(ProtobufDeserializer<>).MakeGenericType(type), Enumerable.Empty>())) - .Deserialize(message); + .DeserializeAsync(input); } } } diff --git a/src/KafkaFlow.Serializer/ConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer/ConfigurationBuilderExtensions.cs index bb34194e0..7d57fba1c 100644 --- a/src/KafkaFlow.Serializer/ConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer/ConfigurationBuilderExtensions.cs @@ -11,12 +11,12 @@ public static class ConfigurationBuilderExtensions /// Registers a middleware to deserialize messages /// /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements /// public static IConsumerMiddlewareConfigurationBuilder AddSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer where TResolver : class, IMessageTypeResolver { middlewares.DependencyConfigurator.AddTransient(); @@ -32,16 +32,16 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements - /// A factory to create a + /// A factory to create a /// A factory to create a /// public static IConsumerMiddlewareConfigurationBuilder AddSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory, Factory resolverFactory) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer where TResolver : class, IMessageTypeResolver { return middlewares.Add( @@ -54,11 +54,11 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// The middleware configuration builder - /// A class that implements + /// A class that implements /// public static IConsumerMiddlewareConfigurationBuilder AddSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { middlewares.DependencyConfigurator.AddTransient(); @@ -70,14 +70,14 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// /// Register a middleware to deserialize messages /// - /// A class that implements + /// A class that implements /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// public static IConsumerMiddlewareConfigurationBuilder AddSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { return middlewares.AddSerializer( serializerFactory, @@ -88,12 +88,12 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// Registers a middleware to serialize messages /// /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements /// public static IProducerMiddlewareConfigurationBuilder AddSerializer( this IProducerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer where TResolver : class, IMessageTypeResolver { middlewares.DependencyConfigurator.AddTransient(); @@ -109,16 +109,16 @@ public static IProducerMiddlewareConfigurationBuilder AddSerializer /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements - /// A factory to create a + /// A factory to create a /// A factory to create a /// public static IProducerMiddlewareConfigurationBuilder AddSerializer( this IProducerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory, Factory resolverFactory) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer where TResolver : class, IMessageTypeResolver { return middlewares.Add( @@ -131,11 +131,11 @@ public static IProducerMiddlewareConfigurationBuilder AddSerializer /// The middleware configuration builder - /// A class that implements + /// A class that implements /// x public static IProducerMiddlewareConfigurationBuilder AddSerializer( this IProducerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { middlewares.DependencyConfigurator.AddTransient(); @@ -149,13 +149,13 @@ public static IProducerMiddlewareConfigurationBuilder AddSerializer /// Registers a middleware to serialize messages /// /// The middleware configuration builder - /// A class that implements - /// A factory to create a + /// A class that implements + /// A factory to create a /// public static IProducerMiddlewareConfigurationBuilder AddSerializer( this IProducerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { return middlewares.AddSerializer( serializerFactory, @@ -166,14 +166,14 @@ public static IProducerMiddlewareConfigurationBuilder AddSerializer /// Register a middleware to deserialize the message to a fixed type /// /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// The message type - /// A class that implements + /// A class that implements /// public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { return middlewares.AddSerializer( serializerFactory, @@ -185,11 +185,11 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer /// The middleware configuration builder /// The message type - /// A class that implements + /// A class that implements /// public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { return middlewares.AddSerializer>(); } @@ -198,14 +198,14 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// The message type - /// A class that implements + /// A class that implements /// public static IProducerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IProducerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { return middlewares.AddSerializer( serializerFactory, @@ -217,11 +217,11 @@ public static IProducerMiddlewareConfigurationBuilder AddSingleTypeSerializer /// The middleware configuration builder /// The message type - /// A class that implements + /// A class that implements /// public static IProducerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IProducerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, IMessageSerializer + where TSerializer : class, ISerializer { return middlewares.AddSerializer>(); } diff --git a/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj b/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj index f54d9dc48..05fe4c6c3 100644 --- a/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj +++ b/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj @@ -8,7 +8,11 @@ - + + + + + diff --git a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs b/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs index 9a0785851..10f9008c3 100644 --- a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs +++ b/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.IO; using System.Threading.Tasks; /// @@ -8,16 +9,16 @@ /// public class SerializerConsumerMiddleware : IMessageMiddleware { - private readonly IMessageSerializer serializer; + private readonly ISerializer serializer; private readonly IMessageTypeResolver typeResolver; /// /// Initializes a new instance of the class. /// - /// Instance of + /// Instance of /// Instance of public SerializerConsumerMiddleware( - IMessageSerializer serializer, + ISerializer serializer, IMessageTypeResolver typeResolver) { this.serializer = serializer; @@ -31,18 +32,19 @@ public SerializerConsumerMiddleware( /// A delegate to the next middleware /// /// Throw if message is not byte[] - public Task Invoke(IMessageContext context, MiddlewareDelegate next) + public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { var messageType = this.typeResolver.OnConsume(context); if (messageType is null) { - return Task.CompletedTask; + return; } if (context.Message.Value is null) { - return next(context); + await next(context).ConfigureAwait(false); + return; } if (context.Message.Value is not byte[] rawData) @@ -51,9 +53,16 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next) $"{nameof(context.Message)} must be a byte array to be deserialized and it is '{context.Message.GetType().FullName}'"); } - var data = this.serializer.Deserialize(rawData, messageType); + using var stream = new MemoryStream(rawData); - return next(context.TransformMessage(context.Message.Key, data)); + var data = await this.serializer + .DeserializeAsync( + stream, + messageType, + SerializerContext.Empty) + .ConfigureAwait(false); + + await next(context.TransformMessage(context.Message.Key, data)).ConfigureAwait(false); } } } diff --git a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs b/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs index 2ded1e32c..03ce23ebd 100644 --- a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs +++ b/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs @@ -1,22 +1,25 @@ namespace KafkaFlow { using System.Threading.Tasks; + using Microsoft.IO; /// /// Middleware to serialize messages when producing /// public class SerializerProducerMiddleware : IMessageMiddleware { - private readonly IMessageSerializer serializer; + private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new(); + + private readonly ISerializer serializer; private readonly IMessageTypeResolver typeResolver; /// /// Initializes a new instance of the class. /// - /// Instance of + /// Instance of /// Instance of public SerializerProducerMiddleware( - IMessageSerializer serializer, + ISerializer serializer, IMessageTypeResolver typeResolver) { this.serializer = serializer; @@ -29,13 +32,25 @@ public SerializerProducerMiddleware( /// The containing the message and metadata /// A delegate to call next middleware /// - public Task Invoke(IMessageContext context, MiddlewareDelegate next) + public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { this.typeResolver.OnProduce(context); - var data = this.serializer.Serialize(context.Message.Value); + byte[] messageValue; + + using (var buffer = MemoryStreamManager.GetStream()) + { + await this.serializer + .SerializeAsync( + context.Message.Value, + buffer, + SerializerContext.Empty) + .ConfigureAwait(false); + + messageValue = buffer.ToArray(); + } - return next(context.TransformMessage(context.Message.Key, data)); + await next(context.TransformMessage(context.Message.Key, messageValue)).ConfigureAwait(false); } } } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index dd2372686..829c26494 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.UnitTests.Serializers { using System; + using System.IO; using System.Threading.Tasks; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -10,7 +11,7 @@ namespace KafkaFlow.UnitTests.Serializers public class SerializerConsumerMiddlewareTests { private Mock contextMock; - private Mock serializerMock; + private Mock serializerMock; private Mock typeResolverMock; private bool nextCalled; @@ -21,7 +22,7 @@ public class SerializerConsumerMiddlewareTests public void Setup() { this.contextMock = new Mock(); - this.serializerMock = new Mock(); + this.serializerMock = new Mock(); this.typeResolverMock = new Mock(); this.target = new SerializerConsumerMiddleware( @@ -44,7 +45,9 @@ public async Task Invoke_NullMessageType_ReturnWithoutCallingNext() this.nextCalled.Should().BeFalse(); this.typeResolverMock.VerifyAll(); this.contextMock.Verify(x => x.TransformMessage(It.IsAny(), It.IsAny()), Times.Never); - this.serializerMock.Verify(x => x.Deserialize(It.IsAny(), It.IsAny()), Times.Never); + this.serializerMock.Verify( + x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), + Times.Never); } [TestMethod] @@ -66,7 +69,9 @@ public async Task Invoke_NullMessage_CallNext() // Assert this.nextCalled.Should().BeTrue(); - this.serializerMock.Verify(x => x.Deserialize(It.IsAny(), It.IsAny()), Times.Never); + this.serializerMock.Verify( + x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), + Times.Never); this.typeResolverMock.VerifyAll(); } @@ -91,7 +96,9 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() act.Should().Throw(); this.nextCalled.Should().BeFalse(); this.contextMock.Verify(x => x.TransformMessage(It.IsAny(), It.IsAny()), Times.Never); - this.serializerMock.Verify(x => x.Deserialize(It.IsAny(), It.IsAny()), Times.Never); + this.serializerMock.Verify( + x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), + Times.Never); this.typeResolverMock.VerifyAll(); } @@ -121,8 +128,8 @@ public async Task Invoke_ValidMessage_Deserialize() .Returns(messageType); this.serializerMock - .Setup(x => x.Deserialize(rawValue, messageType)) - .Returns(deserializedMessage); + .Setup(x => x.DeserializeAsync(It.IsAny(), messageType, It.IsAny())) + .ReturnsAsync(deserializedMessage); // Act await this.target.Invoke( diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index 2e4dfffd8..b2687cba7 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -1,6 +1,10 @@ namespace KafkaFlow.UnitTests.Serializers { + using System; + using System.IO; + using System.Linq; using System.Threading.Tasks; + using AutoFixture; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -8,8 +12,10 @@ namespace KafkaFlow.UnitTests.Serializers [TestClass] public class SerializerProducerMiddlewareTests { + private readonly Fixture fixture = new(); + private Mock contextMock; - private Mock serializerMock; + private Mock serializerMock; private Mock typeResolverMock; private SerializerProducerMiddleware target; @@ -18,7 +24,7 @@ public class SerializerProducerMiddlewareTests public void Setup() { this.contextMock = new Mock(); - this.serializerMock = new Mock(); + this.serializerMock = new Mock(); this.typeResolverMock = new Mock(); this.target = new SerializerProducerMiddleware( @@ -30,7 +36,7 @@ public void Setup() public async Task Invoke_ValidMessage_Serialize() { // Arrange - var rawMessage = new byte[1]; + var rawMessage = this.fixture.Create(); var key = new object(); var deserializedMessage = new Message(key, new TestMessage()); IMessageContext resultContext = null; @@ -44,11 +50,15 @@ public async Task Invoke_ValidMessage_Serialize() this.typeResolverMock.Setup(x => x.OnProduce(this.contextMock.Object)); this.serializerMock - .Setup(x => x.Serialize(deserializedMessage.Value)) - .Returns(rawMessage); + .Setup( + x => x.SerializeAsync( + deserializedMessage.Value, + It.IsAny(), + It.IsAny())) + .Callback((object _, Stream stream, ISerializerContext _) => stream.WriteAsync(rawMessage)); this.contextMock - .Setup(x => x.TransformMessage(key, rawMessage)) + .Setup(x => x.TransformMessage(key, It.Is(value => value.SequenceEqual(rawMessage)))) .Returns(transformedContextMock.Object); // Act