Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: change the serializers to use async and streams #158

Merged
merged 1 commit into from
May 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions src/KafkaFlow.Abstractions/IMessageSerializer.cs

This file was deleted.

30 changes: 30 additions & 0 deletions src/KafkaFlow.Abstractions/ISerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace KafkaFlow
{
using System;
using System.IO;
using System.Threading.Tasks;

/// <summary>
/// Used to implement a message serializer
/// </summary>
public interface ISerializer
{
/// <summary>
/// Serializes the given message
/// </summary>
/// <param name="message">The message to be serialized</param>
/// <param name="output">A stream to write the serialized data</param>
/// <param name="context">An object containing metadata</param>
/// <returns>The serialized message</returns>
Task SerializeAsync(object message, Stream output, ISerializerContext context);

/// <summary>
/// Deserializes the given message
/// </summary>
/// <param name="input">A stream to read the data to be deserialized</param>
/// <param name="type">The type to be created</param>
/// <param name="context">An object containing metadata</param>
/// <returns>The deserialized message</returns>
Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context);
}
}
9 changes: 9 additions & 0 deletions src/KafkaFlow.Abstractions/ISerializerContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace KafkaFlow
{
/// <summary>
/// A context that can have some metadata to help with serialization process
/// </summary>
public interface ISerializerContext
{
}
}
15 changes: 15 additions & 0 deletions src/KafkaFlow.Abstractions/SerializerContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace KafkaFlow
{
/// <inheritdoc />
public class SerializerContext : ISerializerContext
{
/// <summary>
/// The static instance of the <see cref="SerializerContext"/>
/// </summary>
public static readonly ISerializerContext Empty = new SerializerContext();

private SerializerContext()
{
}
}
}
25 changes: 18 additions & 7 deletions src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ namespace KafkaFlow
{
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.IO;

/// <summary>
/// A wrapper to call the typed Confluent deserializers
/// </summary>
public abstract class ConfluentDeserializerWrapper
{
private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new();

private static readonly ConcurrentDictionary<Type, ConfluentDeserializerWrapper> Deserializers = new();

/// <summary>
Expand All @@ -31,9 +36,9 @@ public static ConfluentDeserializerWrapper GetOrCreateDeserializer(
/// <summary>
/// Deserialize a message using the passed deserializer
/// </summary>
/// <param name="message">The message to deserialize</param>
/// <param name="input">The message stream to deserialize</param>
/// <returns></returns>
public abstract object Deserialize(byte[] message);
public abstract Task<object> DeserializeAsync(Stream input);

private class InnerConfluentDeserializerWrapper<T> : ConfluentDeserializerWrapper
{
Expand All @@ -44,12 +49,18 @@ public InnerConfluentDeserializerWrapper(Func<object> deserializerFactory)
this.deserializer = (IAsyncDeserializer<T>) deserializerFactory();
}

public override object Deserialize(byte[] message)
public override async Task<object> DeserializeAsync(Stream input)
{
return this.deserializer
.DeserializeAsync(message, message == null, SerializationContext.Empty)
.GetAwaiter()
.GetResult();
using var buffer = MemoryStreamManager.GetStream();

await input.CopyToAsync(buffer).ConfigureAwait(false);

return await this.deserializer
.DeserializeAsync(
new ReadOnlyMemory<byte>(buffer.GetBuffer(), 0, (int) buffer.Length),
false,
Confluent.Kafka.SerializationContext.Empty)
.ConfigureAwait(false);
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ namespace KafkaFlow
{
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
using Confluent.Kafka;

/// <summary>
Expand Down Expand Up @@ -32,8 +34,9 @@ public static ConfluentSerializerWrapper GetOrCreateSerializer(
/// Serialize a message using the passed serializer
/// </summary>
/// <param name="message">The message to serialize</param>
/// <param name="output">Where the serialization result will be stored</param>
/// <returns></returns>
public abstract byte[] Serialize(object message);
public abstract Task SerializeAsync(object message, Stream output);

private class InnerConfluentSerializerWrapper<T> : ConfluentSerializerWrapper
{
Expand All @@ -44,12 +47,15 @@ public InnerConfluentSerializerWrapper(Func<object> serializerFactory)
this.serializer = (IAsyncSerializer<T>) serializerFactory();
}

public override byte[] Serialize(object message)
public override async Task SerializeAsync(object message, Stream output)
{
return this.serializer
.SerializeAsync((T) message, SerializationContext.Empty)
.GetAwaiter()
.GetResult();
var data = await this.serializer
.SerializeAsync((T) message, Confluent.Kafka.SerializationContext.Empty)
.ConfigureAwait(false);

await output
.WriteAsync(data, 0, data.Length)
.ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<PackageReference Include="Confluent.SchemaRegistry" Version="1.6.3" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
27 changes: 15 additions & 12 deletions src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
namespace KafkaFlow.Serializer
{
using System;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;

/// <summary>
/// A message serializer using System.Text.Json library
/// </summary>
public class JsonCoreSerializer : IMessageSerializer
public class JsonCoreSerializer : ISerializer
{
private readonly JsonSerializerOptions options;

Expand All @@ -27,21 +29,22 @@ public JsonCoreSerializer()
{
}

/// <summary>Serializes the message</summary>
/// <param name="message">The message to be serialized</param>
/// <returns>A UTF8 JSON string</returns>
public byte[] Serialize(object message)
/// <inheritdoc/>
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
{
return JsonSerializer.SerializeToUtf8Bytes(message, this.options);
using var writer = new Utf8JsonWriter(output);

JsonSerializer.Serialize(writer, message, this.options);
filipeesch marked this conversation as resolved.
Show resolved Hide resolved

return Task.CompletedTask;
}

/// <summary>Deserialize the message</summary>
/// <param name="data">The message to be deserialized (cannot be null)</param>
/// <param name="type">The destination type</param>
/// <returns>An instance of the passed type</returns>
public object Deserialize(byte[] data, Type type)
/// <inheritdoc/>
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
return JsonSerializer.Deserialize(data, type, this.options);
return await JsonSerializer
.DeserializeAsync(input, type, this.options)
.ConfigureAwait(false);
}
}
}
30 changes: 16 additions & 14 deletions src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
namespace KafkaFlow.Serializer
{
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;

/// <summary>
/// A message serializer using NewtonsoftJson library
/// </summary>
public class NewtonsoftJsonSerializer : IMessageSerializer
public class NewtonsoftJsonSerializer : ISerializer
{
private readonly JsonSerializerSettings settings;

Expand All @@ -28,24 +30,24 @@ public NewtonsoftJsonSerializer()
{
}

/// <summary>Serializes the message</summary>
/// <param name="message">The message to be serialized</param>
/// <returns>A UTF8 JSON string</returns>
public byte[] Serialize(object message)
/// <inheritdoc/>
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
{
var serialized = JsonConvert.SerializeObject(message, this.settings);
return Encoding.UTF8.GetBytes(serialized);
using var sw = new StreamWriter(output, Encoding.UTF8);
var serializer = JsonSerializer.CreateDefault(this.settings);

serializer.Serialize(sw, message);

return Task.CompletedTask;
}

/// <summary>Deserialize the message</summary>
/// <param name="data">The message to be deserialized (cannot be null)</param>
/// <param name="type">The destination type</param>
/// <returns>An instance of the passed type</returns>
public object Deserialize(byte[] data, Type type)
/// <inheritdoc/>
public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
var json = Encoding.UTF8.GetString(data);
using var sr = new StreamReader(input, Encoding.UTF8);
var serializer = JsonSerializer.CreateDefault(this.settings);

return JsonConvert.DeserializeObject(json, type, this.settings);
return Task.FromResult(serializer.Deserialize(sr, type));
}
}
}
25 changes: 10 additions & 15 deletions src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,26 @@
{
using System;
using System.IO;
using System.Threading.Tasks;
using ProtoBuf;

/// <summary>
/// A message serializer using protobuf-net library
/// </summary>
public class ProtobufNetSerializer : IMessageSerializer
public class ProtobufNetSerializer : ISerializer
{
/// <summary>Serializes the message</summary>
/// <param name="message">The message to be serialized</param>
/// <returns>The serialized message</returns>
public byte[] Serialize(object message)
/// <inheritdoc/>
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
{
using var stream = new MemoryStream();
Serializer.Serialize(stream, message);
return stream.ToArray();
Serializer.Serialize(output, message);

return Task.CompletedTask;
}

/// <summary>Deserialize the message </summary>
/// <param name="data">The message to be deserialized</param>
/// <param name="type">The destination type</param>
/// <returns>The deserialized message</returns>
public object Deserialize(byte[] data, Type type)
/// <inheritdoc/>
public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
using var stream = new MemoryStream(data);
return Serializer.Deserialize(type, stream);
return Task.FromResult(Serializer.Deserialize(type, input));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
namespace KafkaFlow.Serializer.SchemaRegistry
{
using System;
using System.IO;
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

/// <summary>
/// A message serializer using Apache.Avro library
/// </summary>
public class ConfluentAvroSerializer : IMessageSerializer
public class ConfluentAvroSerializer : ISerializer
{
private readonly ISchemaRegistryClient schemaRegistryClient;
private readonly AvroSerializerConfig serializerConfig;
Expand Down Expand Up @@ -39,7 +41,7 @@ public ConfluentAvroSerializer(
}

/// <inheritdoc/>
public byte[] Serialize(object message)
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
{
return ConfluentSerializerWrapper
.GetOrCreateSerializer(
Expand All @@ -48,11 +50,11 @@ public byte[] Serialize(object message)
typeof(AvroSerializer<>).MakeGenericType(message.GetType()),
this.schemaRegistryClient,
this.serializerConfig))
.Serialize(message);
.SerializeAsync(message, output);
}

/// <inheritdoc/>
public object Deserialize(byte[] data, Type type)
public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
return ConfluentDeserializerWrapper
.GetOrCreateDeserializer(
Expand All @@ -62,7 +64,7 @@ public object Deserialize(byte[] data, Type type)
typeof(AvroDeserializer<>).MakeGenericType(type),
this.schemaRegistryClient,
new AvroDeserializerConfig()))
.Deserialize(data);
.DeserializeAsync(input);
}
}
}
Loading