From c705b290deaeb19b59119325a919bc5ca9b6f0fe Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Tue, 27 Apr 2021 11:25:12 +0100 Subject: [PATCH 1/5] refactor!: refactor the IMessageContext to a more concise structure - The Transform method now creates a new MessageContext containing the new Message - The middlewares executor do not clone the MessageContext any more - Renames the MessageContextConsumer to ConsumerContext and all information about the consumer and consumed message metadata is stored there - Creates the ProducerContext and all information about the producer and produced message metadata is stored there --- .../Handlers/AvroMessageHandler1.cs | 4 +- .../Handlers/AvroMessageHandler2.cs | 4 +- .../KafkaFlow.Sample.BatchConsume/Program.cs | 2 +- .../KafkaFlow.Sample/PrintConsoleHandler.cs | 4 +- .../IConsumerConfigurationBuilder.cs | 2 +- ...ContextConsumer.cs => IConsumerContext.cs} | 29 ++++++- src/KafkaFlow.Abstractions/IMessageContext.cs | 53 +++--------- .../IProducerContext.cs | 23 +++++ src/KafkaFlow.Abstractions/Message.cs | 29 +++++++ .../BatchConsumeExtensions.cs | 2 +- .../BatchConsumeMessageContext.cs | 32 ++----- .../BatchConsumeMiddleware.cs | 4 +- src/KafkaFlow.BatchConsume/WorkerBatch.cs | 14 ++- .../CompressorConsumerMiddleware.cs | 5 +- .../CompressorProducerMiddleware.cs | 7 +- .../Core/Handlers/PauseResumeHandler.cs | 4 +- .../Core/Middlewares/GzipMiddleware.cs | 2 +- .../DefaultMessageTypeResolver.cs | 6 +- .../SerializerConsumerMiddleware.cs | 8 +- .../SerializerProducerMiddleware.cs | 6 +- .../TypedHandlerMiddleware.cs | 6 +- .../BatchConsumeMiddlewareTests.cs | 12 +-- .../BatchConsume/WorkerBatchTests.cs | 16 ++-- .../CompressorConsumerMiddlewareTests.cs | 34 +++++--- .../CompressorProducerMiddlewareTests.cs | 85 +++++++++++++------ ...nsumerTests.cs => ConsumerContextTests.cs} | 9 +- .../KafkaFlow.UnitTests.csproj | 24 +++--- .../SerializerConsumerMiddlewareTests.cs | 36 +++++--- .../SerializerProducerMiddlewareTests.cs | 33 ++++--- src/KafkaFlow/ConsumerMessageContext.cs | 48 ----------- src/KafkaFlow/Consumers/ConsumerContext.cs | 54 ++++++++++++ src/KafkaFlow/Consumers/ConsumerWorker.cs | 21 ++--- .../Consumers/MessageContextConsumer.cs | 53 ------------ src/KafkaFlow/KafkaFlow.csproj | 6 +- src/KafkaFlow/MessageContext.cs | 31 +++++++ src/KafkaFlow/MiddlewareExecutor.cs | 2 +- src/KafkaFlow/ProducerMessageContext.cs | 44 ---------- src/KafkaFlow/Producers/MessageProducer.cs | 68 ++++++++------- src/KafkaFlow/Producers/ProducerContext.cs | 16 ++++ 39 files changed, 450 insertions(+), 388 deletions(-) rename src/KafkaFlow.Abstractions/{IMessageContextConsumer.cs => IConsumerContext.cs} (64%) create mode 100644 src/KafkaFlow.Abstractions/IProducerContext.cs create mode 100644 src/KafkaFlow.Abstractions/Message.cs rename src/KafkaFlow.UnitTests/{MessageContextConsumerTests.cs => ConsumerContextTests.cs} (82%) delete mode 100644 src/KafkaFlow/ConsumerMessageContext.cs create mode 100644 src/KafkaFlow/Consumers/ConsumerContext.cs delete mode 100644 src/KafkaFlow/Consumers/MessageContextConsumer.cs create mode 100644 src/KafkaFlow/MessageContext.cs delete mode 100644 src/KafkaFlow/ProducerMessageContext.cs create mode 100644 src/KafkaFlow/Producers/ProducerContext.cs diff --git a/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler1.cs b/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler1.cs index 145ec6367..aa803bb65 100644 --- a/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler1.cs +++ b/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler1.cs @@ -11,8 +11,8 @@ public Task Handle(IMessageContext context, LogMessages1 message) { Console.WriteLine( "Partition: {0} | Offset: {1} | Message: {2}", - context.Partition, - context.Offset, + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, message.Severity.ToString()); return Task.CompletedTask; diff --git a/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler2.cs b/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler2.cs index 04b118d49..9a6aa3ae1 100644 --- a/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler2.cs +++ b/samples/KafkaFlow.Sample.Avro/Handlers/AvroMessageHandler2.cs @@ -11,8 +11,8 @@ public Task Handle(IMessageContext context, LogMessages2 message) { Console.WriteLine( "Partition: {0} | Offset: {1} | Message: {2}", - context.Partition, - context.Offset, + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, message.Message); return Task.CompletedTask; diff --git a/samples/KafkaFlow.Sample.BatchConsume/Program.cs b/samples/KafkaFlow.Sample.BatchConsume/Program.cs index 9f0a27b45..70e4df0c5 100644 --- a/samples/KafkaFlow.Sample.BatchConsume/Program.cs +++ b/samples/KafkaFlow.Sample.BatchConsume/Program.cs @@ -101,7 +101,7 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next) var text = string.Join( '\n', - batch.Select(ctx => ((SampleBatchMessage) ctx.Message).Text)); + batch.Select(ctx => ((SampleBatchMessage) ctx.Message.Value).Text)); Console.WriteLine(text); diff --git a/samples/KafkaFlow.Sample/PrintConsoleHandler.cs b/samples/KafkaFlow.Sample/PrintConsoleHandler.cs index c84a68638..d46d5fb1b 100644 --- a/samples/KafkaFlow.Sample/PrintConsoleHandler.cs +++ b/samples/KafkaFlow.Sample/PrintConsoleHandler.cs @@ -10,8 +10,8 @@ public Task Handle(IMessageContext context, TestMessage message) { Console.WriteLine( "Partition: {0} | Offset: {1} | Message: {2}", - context.Partition, - context.Offset, + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, message.Text); return Task.CompletedTask; diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs index 2f42f526e..cf443e002 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs @@ -108,7 +108,7 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy() IConsumerConfigurationBuilder WithAutoStoreOffsets(); /// - /// The client should call the + /// The client should call the /// /// IConsumerConfigurationBuilder WithManualStoreOffsets(); diff --git a/src/KafkaFlow.Abstractions/IMessageContextConsumer.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs similarity index 64% rename from src/KafkaFlow.Abstractions/IMessageContextConsumer.cs rename to src/KafkaFlow.Abstractions/IConsumerContext.cs index 4ac98a029..76733d147 100644 --- a/src/KafkaFlow.Abstractions/IMessageContextConsumer.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -6,18 +6,43 @@ namespace KafkaFlow /// /// Represents the message consumer /// - public interface IMessageContextConsumer + public interface IConsumerContext { /// /// Gets the consumer unique name defined in configuration /// - string Name { get; } + string ConsumerName { get; } /// /// Gets a CancellationToken that is cancelled when the worker is requested to stop /// CancellationToken WorkerStopped { get; } + /// + /// Gets the worker id that is processing the message + /// + int WorkerId { get; } + + /// + /// Gets the topic associated with the message + /// + string Topic { get; } + + /// + /// Gets the partition associated with the message + /// + int Partition { get; } + + /// + /// Gets the partition offset associated with the message + /// + long Offset { get; } + + /// + /// Gets the consumer group id from kafka consumer that received the message + /// + string GroupId { get; } + /// /// Gets message timestamp. By default is the UTC timestamp when the message was produced /// diff --git a/src/KafkaFlow.Abstractions/IMessageContext.cs b/src/KafkaFlow.Abstractions/IMessageContext.cs index cf514473a..5e987d5ac 100644 --- a/src/KafkaFlow.Abstractions/IMessageContext.cs +++ b/src/KafkaFlow.Abstractions/IMessageContext.cs @@ -6,19 +6,9 @@ namespace KafkaFlow public interface IMessageContext { /// - /// Gets the worker id that is processing the message + /// Gets the message /// - int WorkerId { get; } - - /// - /// Gets the message key - /// - byte[] PartitionKey { get; } - - /// - /// Gets the message value - /// - object Message { get; } + Message Message { get; } /// /// Gets the message headers @@ -26,42 +16,21 @@ public interface IMessageContext IMessageHeaders Headers { get; } /// - /// Gets the topic associated with the message - /// - string Topic { get; } - - /// - /// Gets the partition associated with the message - /// - int? Partition { get; } - - /// - /// Gets the partition offset associated with the message - /// - long? Offset { get; } - - /// - /// Gets the consumer group id from kafka consumer that received the message - /// - string GroupId { get; } - - /// - /// Gets the from the consumed message + /// Gets the from the consumed message /// - IMessageContextConsumer Consumer { get; } + IConsumerContext ConsumerContext { get; } /// - /// Transforms the message to a new value + /// Gets the from the produced message /// - /// New message value - void TransformMessage(object message); + IProducerContext ProducerContext { get; } /// - /// Creates a clone of the current + /// Creates a new with the new message /// - /// - /// A clone of the current - /// - IMessageContext Clone(); + /// The new message key + /// The new message value + /// A new message context containing the new values + IMessageContext TransformMessage(object key, object value); // TODO: maybe a better name? } } diff --git a/src/KafkaFlow.Abstractions/IProducerContext.cs b/src/KafkaFlow.Abstractions/IProducerContext.cs new file mode 100644 index 000000000..886e856e4 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IProducerContext.cs @@ -0,0 +1,23 @@ +namespace KafkaFlow +{ + /// + /// Some producer metadata + /// + public interface IProducerContext + { + /// + /// Gets the topic associated with the message + /// + string Topic { get; } + + /// + /// Gets the partition associated with the message + /// + int? Partition { get; } + + /// + /// Gets the partition offset associated with the message + /// + long? Offset { get; } + } +} diff --git a/src/KafkaFlow.Abstractions/Message.cs b/src/KafkaFlow.Abstractions/Message.cs new file mode 100644 index 000000000..883f63ff4 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Message.cs @@ -0,0 +1,29 @@ +namespace KafkaFlow +{ + /// + /// Represents a Kafka message + /// + public readonly struct Message + { + /// + /// Initializes a new instance of the struct. + /// + /// + /// + public Message(object key, object value) + { + this.Key = key; + this.Value = value; + } + + /// + /// Gets the message key + /// + public object Key { get; } + + /// + /// Gets the message value + /// + public object Value { get; } + } +} diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs b/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs index 7350baa38..f4a641711 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs @@ -38,7 +38,7 @@ public static IReadOnlyCollection GetMessagesBatch(this IMessag { if (context is BatchConsumeMessageContext ctx) { - return (IReadOnlyCollection) ctx.Message; + return (IReadOnlyCollection) ctx.Message.Value; } throw new InvalidOperationException($"This method can only be used on {nameof(BatchConsumeMessageContext)}"); diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs index 9da5297e5..80bcc86be 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs @@ -1,41 +1,27 @@ namespace KafkaFlow.BatchConsume { + using System; using System.Collections.Generic; internal class BatchConsumeMessageContext : IMessageContext { public BatchConsumeMessageContext( - int workerId, - string groupId, - IMessageContextConsumer consumer, + IConsumerContext consumer, IReadOnlyCollection batchMessage) { - this.WorkerId = workerId; - this.GroupId = groupId; - this.Consumer = consumer; - this.Message = batchMessage; + this.ConsumerContext = consumer; + this.Message = new Message(null, batchMessage); } - public int WorkerId { get; } - - public byte[] PartitionKey => null; - - public object Message { get; private set; } + public Message Message { get; } public IMessageHeaders Headers { get; } = new MessageHeaders(); - public string Topic => null; - - public int? Partition => null; - - public long? Offset => null; - - public string GroupId { get; } - - public IMessageContextConsumer Consumer { get; } + public IConsumerContext ConsumerContext { get; } - public void TransformMessage(object message) => this.Message = message; + public IProducerContext ProducerContext => null; - public IMessageContext Clone() => (IMessageContext) this.MemberwiseClone(); + public IMessageContext TransformMessage(object key, object value) => + throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message"); } } diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs index 2bd39d68f..bf82da475 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs @@ -28,10 +28,10 @@ public BatchConsumeMiddleware( public Task Invoke(IMessageContext context, MiddlewareDelegate next) { var workerBatch = this.batches.GetOrAdd( - context.WorkerId, + context.ConsumerContext.WorkerId, _ => this.workerBatchFactory.Create(this.batchSize, this.batchTimeout, this.logHandler)); - context.Consumer.ShouldStoreOffset = false; + context.ConsumerContext.ShouldStoreOffset = false; return workerBatch.AddAsync(context, next); } diff --git a/src/KafkaFlow.BatchConsume/WorkerBatch.cs b/src/KafkaFlow.BatchConsume/WorkerBatch.cs index 6f0eaa687..2bf68d4a0 100644 --- a/src/KafkaFlow.BatchConsume/WorkerBatch.cs +++ b/src/KafkaFlow.BatchConsume/WorkerBatch.cs @@ -62,7 +62,7 @@ private Task ScheduleDispatch(IMessageContext context, MiddlewareDelegate return Task .Delay(this.batchTimeout, this.cancelScheduleTokenSource.Token) .ContinueWith( - async task => + async _ => { await this.semaphore.WaitAsync().ConfigureAwait(false); @@ -85,9 +85,7 @@ private async Task Dispatch(IMessageContext context, MiddlewareDelegate next) try { var batchContext = new BatchConsumeMessageContext( - context.WorkerId, - context.GroupId, - context.Consumer, + context.ConsumerContext, this.batch.ToList()); await next(batchContext).ConfigureAwait(false); @@ -99,16 +97,16 @@ private async Task Dispatch(IMessageContext context, MiddlewareDelegate next) ex, new { - context.Topic, - context.GroupId, - context.WorkerId, + context.ConsumerContext.Topic, + context.ConsumerContext.GroupId, + context.ConsumerContext.WorkerId, }); } finally { foreach (var messageContext in this.batch) { - messageContext.Consumer.StoreOffset(); + messageContext.ConsumerContext.StoreOffset(); } this.dispatchTask = null; diff --git a/src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs b/src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs index c77d1a872..90b14a6b4 100644 --- a/src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs +++ b/src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs @@ -22,16 +22,15 @@ public CompressorConsumerMiddleware(IMessageCompressor compressor) /// public Task Invoke(IMessageContext context, MiddlewareDelegate next) { - if (!(context.Message is byte[] rawData)) + if (!(context.Message.Value is byte[] rawData)) { throw new InvalidOperationException( $"{nameof(context.Message)} must be a byte array to be decompressed and it is '{context.Message.GetType().FullName}'"); } var data = this.compressor.Decompress(rawData); - context.TransformMessage(data); - return next(context); + return next(context.TransformMessage(context.Message.Key, data)); } } } diff --git a/src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs b/src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs index b5b4355a6..6ca603a86 100644 --- a/src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs +++ b/src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs @@ -6,7 +6,7 @@ /// /// Middleware to compress the messages when producing /// - internal class CompressorProducerMiddleware : IMessageMiddleware + public class CompressorProducerMiddleware : IMessageMiddleware { private readonly IMessageCompressor compressor; @@ -22,16 +22,15 @@ public CompressorProducerMiddleware(IMessageCompressor compressor) /// public Task Invoke(IMessageContext context, MiddlewareDelegate next) { - if (!(context.Message is byte[] rawData)) + if (context.Message.Value is not byte[] rawData) { throw new InvalidOperationException( $"{nameof(context.Message)} must be a byte array to be compressed and it is '{context.Message.GetType().FullName}'"); } var data = this.compressor.Compress(rawData); - context.TransformMessage(data); - return next(context); + return next(context.TransformMessage(context.Message.Value, data)); } } } diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs index 87778d409..f58772786 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs @@ -8,13 +8,13 @@ internal class PauseResumeHandler : IMessageHandler { public async Task Handle(IMessageContext context, PauseResumeMessage message) { - context.Consumer.Pause(); + context.ConsumerContext.Pause(); await Task.Delay(Bootstrapper.MaxPollIntervalMs + 1000); MessageStorage.Add(message); - context.Consumer.Resume(); + context.ConsumerContext.Resume(); } } } diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs index 0e49e242e..4a73f4f4d 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs @@ -7,7 +7,7 @@ internal class GzipMiddleware : IMessageMiddleware { public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { - MessageStorage.Add((byte[]) context.Message); + MessageStorage.Add((byte[]) context.Message.Value); await next(context); } } diff --git a/src/KafkaFlow.Serializer/DefaultMessageTypeResolver.cs b/src/KafkaFlow.Serializer/DefaultMessageTypeResolver.cs index b4dc5d739..f12376f6a 100644 --- a/src/KafkaFlow.Serializer/DefaultMessageTypeResolver.cs +++ b/src/KafkaFlow.Serializer/DefaultMessageTypeResolver.cs @@ -27,14 +27,16 @@ public Type OnConsume(IMessageContext context) /// The message context public void OnProduce(IMessageContext context) { - if (context.Message is null) + if (context.Message.Value is null) { return; } + var messageType = context.Message.Value.GetType(); + context.Headers.SetString( MessageType, - $"{context.Message.GetType().FullName}, {context.Message.GetType().Assembly.GetName().Name}"); + $"{messageType.FullName}, {messageType.Assembly.GetName().Name}"); } } } diff --git a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs b/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs index 3b536452b..9a0785851 100644 --- a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs +++ b/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs @@ -40,20 +40,20 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next) return Task.CompletedTask; } - if (context.Message is null) + if (context.Message.Value is null) { return next(context); } - if (!(context.Message is byte[] rawData)) + if (context.Message.Value is not byte[] rawData) { throw new InvalidOperationException( $"{nameof(context.Message)} must be a byte array to be deserialized and it is '{context.Message.GetType().FullName}'"); } - context.TransformMessage(this.serializer.Deserialize(rawData, messageType)); + var data = this.serializer.Deserialize(rawData, messageType); - return next(context); + return next(context.TransformMessage(context.Message.Key, data)); } } } diff --git a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs b/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs index 779fac52e..2ded1e32c 100644 --- a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs +++ b/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs @@ -33,11 +33,9 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next) { this.typeResolver.OnProduce(context); - var data = this.serializer.Serialize(context.Message); + var data = this.serializer.Serialize(context.Message.Value); - context.TransformMessage(data); - - return next(context); + return next(context.TransformMessage(context.Message.Key, data)); } } } diff --git a/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs b/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs index efd1fb3b1..0b4e2a55c 100644 --- a/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs +++ b/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs @@ -23,15 +23,15 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) await Task.WhenAll( this.configuration .HandlerMapping - .GetHandlersTypes(context.Message.GetType()) + .GetHandlersTypes(context.Message.Value.GetType()) .Select( handler => HandlerExecutor - .GetExecutor(context.Message.GetType()) + .GetExecutor(context.Message.Value.GetType()) .Execute( scope.Resolver.Resolve(handler), context, - context.Message))) + context.Message.Value))) .ConfigureAwait(false); } diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index 70a1e747c..a7a980a5c 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -37,30 +37,30 @@ public async Task Invoke_DifferentWorkers_CallAddAsyncForEachOne() var contextWorker1Mock = new Mock(); var contextWorker2Mock = new Mock(); - var consumerContextWorker1Mock = new Mock(); - var consumerContextWorker2Mock = new Mock(); + var consumerContextWorker1Mock = new Mock(); + var consumerContextWorker2Mock = new Mock(); var worker1Batch = new Mock(); var worker2Batch = new Mock(); var nextMock = new Mock(); - contextWorker1Mock + consumerContextWorker1Mock .SetupGet(x => x.WorkerId) .Returns(1); contextWorker1Mock - .SetupGet(x => x.Consumer) + .SetupGet(x => x.ConsumerContext) .Returns(consumerContextWorker1Mock.Object); consumerContextWorker1Mock.SetupSet(x => x.ShouldStoreOffset = false); - contextWorker2Mock + consumerContextWorker2Mock .SetupGet(x => x.WorkerId) .Returns(2); contextWorker2Mock - .SetupGet(x => x.Consumer) + .SetupGet(x => x.ConsumerContext) .Returns(consumerContextWorker2Mock.Object); consumerContextWorker2Mock.SetupSet(x => x.ShouldStoreOffset = false); diff --git a/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs index eff817167..a1c84c452 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs @@ -40,11 +40,11 @@ public void Setup() public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout() { // Arrange - var consumerContext = new Mock(); + var consumerContext = new Mock(); var context = new Mock(); context - .Setup(x => x.Consumer) + .Setup(x => x.ConsumerContext) .Returns(consumerContext.Object); // Act @@ -61,11 +61,11 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout() public async Task AddAsync_ExactlyBatchSize_CallNextInstantly() { // Arrange - var consumerContext = new Mock(); + var consumerContext = new Mock(); var contextMock = new Mock(); contextMock - .Setup(x => x.Consumer) + .Setup(x => x.ConsumerContext) .Returns(consumerContext.Object); // Act @@ -84,11 +84,11 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly() public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeout() { // Arrange - var consumerContext = new Mock(); + var consumerContext = new Mock(); var contextMock = new Mock(); contextMock - .Setup(x => x.Consumer) + .Setup(x => x.ConsumerContext) .Returns(consumerContext.Object); // Act @@ -111,11 +111,11 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou public async Task AddAsync_NextThrowException_LogError() { // Arrange - var consumerContext = new Mock(); + var consumerContext = new Mock(); var contextMock = new Mock(); contextMock - .Setup(x => x.Consumer) + .Setup(x => x.ConsumerContext) .Returns(consumerContext.Object); var ex = new Exception(); diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs index 87cc08cfc..44f2d5016 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs @@ -29,7 +29,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() // Arrange this.contextMock .SetupGet(x => x.Message) - .Returns(new object()); + .Returns(new Message(new object(), new object())); // Act Func act = () => this.target.Invoke(this.contextMock.Object, _ => this.SetNextCalled()); @@ -37,7 +37,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() // Assert act.Should().Throw(); this.nextCalled.Should().BeFalse(); - this.contextMock.Verify(x => x.TransformMessage(It.IsAny()), Times.Never); + this.contextMock.Verify(x => x.TransformMessage(It.IsAny(), It.IsAny()), Times.Never); this.compressorMock.Verify(x => x.Decompress(It.IsAny()), Times.Never); } @@ -45,23 +45,37 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() public async Task Invoke_ValidMessage_CallNext() { // Arrange - var rawMessage = new byte[1]; - var decompressed = new byte[1]; + var compressedMessage = new Message(null, new byte[1]); + var uncompressedValue = new byte[1]; + + var transformedContextMock = new Mock(); + IMessageContext resultContext = null; this.contextMock .SetupGet(x => x.Message) - .Returns(rawMessage); + .Returns(compressedMessage); this.compressorMock - .Setup(x => x.Decompress(rawMessage)) - .Returns(decompressed); + .Setup(x => x.Decompress((byte[]) compressedMessage.Value)) + .Returns(uncompressedValue); + + this.contextMock + .Setup(x => x.TransformMessage(compressedMessage.Key, uncompressedValue)) + .Returns(transformedContextMock.Object); // Act - await this.target.Invoke(this.contextMock.Object, _ => this.SetNextCalled()); + await this.target.Invoke( + this.contextMock.Object, + ctx => + { + resultContext = ctx; + return Task.CompletedTask; + }); // Assert - this.nextCalled.Should().BeTrue(); - this.contextMock.Verify(x => x.TransformMessage(decompressed)); + resultContext.Should().NotBeNull(); + resultContext.Should().Be(transformedContextMock.Object); + this.contextMock.VerifyAll(); this.compressorMock.VerifyAll(); } diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs index a4cfb7835..8f5b1d98c 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs @@ -1,7 +1,9 @@ namespace KafkaFlow.UnitTests.Compressors { + using System; using System.Threading.Tasks; using FluentAssertions; + using KafkaFlow.Compressor; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -9,56 +11,83 @@ namespace KafkaFlow.UnitTests.Compressors internal class CompressorProducerMiddlewareTests { private Mock contextMock; - private Mock serializerMock; - private Mock typeResolverMock; - private bool nextCalled; - private SerializerProducerMiddleware target; + private Mock compressorMock; + + private CompressorProducerMiddleware target; [TestInitialize] public void Setup() { this.contextMock = new Mock(); - this.serializerMock = new Mock(); - this.typeResolverMock = new Mock(); + this.compressorMock = new Mock(); - this.target = new SerializerProducerMiddleware( - this.serializerMock.Object, - this.typeResolverMock.Object); + this.target = new CompressorProducerMiddleware(this.compressorMock.Object); } [TestMethod] - public async Task Invoke_ValidMessage_CallNext() + public async Task Invoke_InvalidMessage_Throws() { // Arrange - var rawMessage = new byte[1]; - var deserializedMessage = new object(); + var uncompressedMessage = new Message(new byte[1], new object()); + IMessageContext resultContext = null; this.contextMock .SetupGet(x => x.Message) - .Returns(deserializedMessage); + .Returns(uncompressedMessage); - this.typeResolverMock.Setup(x => x.OnProduce(this.contextMock.Object)); + // Act + Func act = () => this.target.Invoke( + this.contextMock.Object, + ctx => + { + resultContext = ctx; + return Task.CompletedTask; + }); - this.serializerMock - .Setup(x => x.Serialize(deserializedMessage)) - .Returns(rawMessage); + // Assert + await act.Should().ThrowAsync(); + resultContext.Should().BeNull(); + this.contextMock.Verify(x => x.TransformMessage(It.IsAny(), It.IsAny()), Times.Never); + this.compressorMock.Verify(x => x.Compress(It.IsAny()), Times.Never); + } - this.contextMock.Setup(x => x.TransformMessage(rawMessage)); + [TestMethod] + public async Task Invoke_ValidMessage_Compress() + { + // Arrange + var uncompressedValue = new byte[1]; + var compressedValue = new byte[1]; + var uncompressedMessage = new Message(new byte[1], uncompressedValue); + + var transformedContextMock = new Mock(); + IMessageContext resultContext = null; + + this.contextMock + .SetupGet(x => x.Message) + .Returns(uncompressedMessage); + + this.compressorMock + .Setup(x => x.Compress(uncompressedValue)) + .Returns(compressedValue); + + this.contextMock + .Setup(x => x.TransformMessage(uncompressedMessage.Key, compressedValue)) + .Returns(transformedContextMock.Object); // Act - await this.target.Invoke(this.contextMock.Object, _ => this.SetNextCalled()); + await this.target.Invoke( + this.contextMock.Object, + ctx => + { + resultContext = ctx; + return Task.CompletedTask; + }); // Assert - this.nextCalled.Should().BeTrue(); + resultContext.Should().NotBeNull(); + resultContext.Should().Be(transformedContextMock.Object); this.contextMock.VerifyAll(); - this.serializerMock.VerifyAll(); - this.typeResolverMock.VerifyAll(); - } - - private Task SetNextCalled() - { - this.nextCalled = true; - return Task.CompletedTask; + this.compressorMock.VerifyAll(); } } } diff --git a/src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs b/src/KafkaFlow.UnitTests/ConsumerContextTests.cs similarity index 82% rename from src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs rename to src/KafkaFlow.UnitTests/ConsumerContextTests.cs index e25ffdec9..f0844f7bd 100644 --- a/src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs +++ b/src/KafkaFlow.UnitTests/ConsumerContextTests.cs @@ -8,7 +8,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] - internal class MessageContextConsumerTests + internal class ConsumerContextTests { [TestMethod] public void MessageTimestamp_ConsumeResultHasMessageTimestamp_ReturnsMessageTimestampFromResult() @@ -30,7 +30,12 @@ public void MessageTimestamp_ConsumeResultHasMessageTimestamp_ReturnsMessageTime }, }; - var target = new MessageContextConsumer(null, null, consumerResult, CancellationToken.None); + var target = new ConsumerContext( + null, + null, + consumerResult, + CancellationToken.None, + 0); // Act var messageTimestamp = target.MessageTimestamp; diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 4e1ef0131..dcfb8cd64 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -6,21 +6,21 @@ - - - - - - - + + + + + + + - - - - - + + + + + diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index 671a2e52d..c3435a383 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -43,7 +43,7 @@ public async Task Invoke_NullMessageType_ReturnWithoutCallingNext() // Assert this.nextCalled.Should().BeFalse(); this.typeResolverMock.VerifyAll(); - this.contextMock.Verify(x => x.TransformMessage(It.IsAny()), Times.Never); + this.contextMock.Verify(x => x.TransformMessage(It.IsAny(), It.IsAny()), Times.Never); this.serializerMock.Verify(x => x.Deserialize(It.IsAny(), It.IsAny()), Times.Never); } @@ -55,7 +55,7 @@ public async Task Invoke_NullMessage_CallNext() this.contextMock .SetupGet(x => x.Message) - .Returns((byte[]) null); + .Returns(new Message(null, null)); this.typeResolverMock .Setup(x => x.OnConsume(this.contextMock.Object)) @@ -78,7 +78,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() this.contextMock .SetupGet(x => x.Message) - .Returns(new TestMessage()); + .Returns(new Message(null, new TestMessage())); this.typeResolverMock .Setup(x => x.OnConsume(this.contextMock.Object)) @@ -90,37 +90,53 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() // Assert act.Should().Throw(); this.nextCalled.Should().BeFalse(); - this.contextMock.Verify(x => x.TransformMessage(It.IsAny()), Times.Never); + this.contextMock.Verify(x => x.TransformMessage(It.IsAny(), It.IsAny()), Times.Never); this.serializerMock.Verify(x => x.Deserialize(It.IsAny(), It.IsAny()), Times.Never); this.typeResolverMock.VerifyAll(); } [TestMethod] - public async Task Invoke_ValidMessage_CallNext() + public async Task Invoke_ValidMessage_Deserialize() { // Arrange var messageType = typeof(TestMessage); - var rawMessage = new byte[1]; + var rawKey = new byte[1]; + var rawValue = new byte[1]; + var rawMessage = new Message(rawKey, rawValue); var deserializedMessage = new TestMessage(); + var transformedContextMock = new Mock(); + IMessageContext resultContext = null; + this.contextMock .SetupGet(x => x.Message) .Returns(rawMessage); + this.contextMock + .Setup(x => x.TransformMessage(rawKey, deserializedMessage)) + .Returns(transformedContextMock.Object); + this.typeResolverMock .Setup(x => x.OnConsume(this.contextMock.Object)) .Returns(messageType); this.serializerMock - .Setup(x => x.Deserialize(rawMessage, messageType)) + .Setup(x => x.Deserialize(rawValue, messageType)) .Returns(deserializedMessage); // Act - await this.target.Invoke(this.contextMock.Object, _ => this.SetNextCalled()); + await this.target.Invoke( + this.contextMock.Object, + ctx => + { + resultContext = ctx; + return Task.CompletedTask; + }); // Assert - this.nextCalled.Should().BeTrue(); - this.contextMock.Verify(x => x.TransformMessage(deserializedMessage)); + resultContext.Should().NotBeNull(); + resultContext.Should().Be(transformedContextMock.Object); + this.contextMock.VerifyAll(); this.serializerMock.VerifyAll(); this.typeResolverMock.VerifyAll(); } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index aa7009e57..f2c167b53 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -12,8 +12,6 @@ internal class SerializerProducerMiddlewareTests private Mock serializerMock; private Mock typeResolverMock; - private bool nextCalled; - private SerializerProducerMiddleware target; [TestInitialize] @@ -29,11 +27,15 @@ public void Setup() } [TestMethod] - public async Task Invoke_ValidMessage_CallNext() + public async Task Invoke_ValidMessage_Serialize() { // Arrange var rawMessage = new byte[1]; - var deserializedMessage = new TestMessage(); + var key = new object(); + var deserializedMessage = new Message(key, new TestMessage()); + IMessageContext resultContext = null; + + var transformedContextMock = new Mock(); this.contextMock .SetupGet(x => x.Message) @@ -42,27 +44,30 @@ public async Task Invoke_ValidMessage_CallNext() this.typeResolverMock.Setup(x => x.OnProduce(this.contextMock.Object)); this.serializerMock - .Setup(x => x.Serialize(deserializedMessage)) + .Setup(x => x.Serialize(deserializedMessage.Value)) .Returns(rawMessage); - this.contextMock.Setup(x => x.TransformMessage(rawMessage)); + this.contextMock + .Setup(x => x.TransformMessage(key, rawMessage)) + .Returns(transformedContextMock.Object); // Act - await this.target.Invoke(this.contextMock.Object, _ => this.SetNextCalled()); + await this.target.Invoke( + this.contextMock.Object, + ctx => + { + resultContext = ctx; + return Task.CompletedTask; + }); // Assert - this.nextCalled.Should().BeTrue(); + resultContext.Should().NotBeNull(); + resultContext.Should().Be(transformedContextMock.Object); this.contextMock.VerifyAll(); this.serializerMock.VerifyAll(); this.typeResolverMock.VerifyAll(); } - private Task SetNextCalled() - { - this.nextCalled = true; - return Task.CompletedTask; - } - private class TestMessage { } diff --git a/src/KafkaFlow/ConsumerMessageContext.cs b/src/KafkaFlow/ConsumerMessageContext.cs deleted file mode 100644 index 179988a31..000000000 --- a/src/KafkaFlow/ConsumerMessageContext.cs +++ /dev/null @@ -1,48 +0,0 @@ -namespace KafkaFlow -{ - using Confluent.Kafka; - - internal class ConsumerMessageContext : IMessageContext - { - private readonly ConsumeResult result; - - public ConsumerMessageContext( - IMessageContextConsumer consumer, - ConsumeResult result, - int workerId, - string groupId) - { - this.result = result; - this.Consumer = consumer; - this.Message = result.Message.Value; - this.Headers = new MessageHeaders(result.Message.Headers); - this.WorkerId = workerId; - this.GroupId = groupId; - } - - public int WorkerId { get; } - - public byte[] PartitionKey => this.result.Message.Key; - - public object Message { get; private set; } - - public IMessageHeaders Headers { get; } - - public string Topic => this.result.Topic; - - public string GroupId { get; } - - public int? Partition => this.result.Partition.Value; - - public long? Offset => this.result.Offset.Value; - - public IMessageContextConsumer Consumer { get; } - - public void TransformMessage(object message) - { - this.Message = message; - } - - public IMessageContext Clone() => (IMessageContext) this.MemberwiseClone(); - } -} diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs new file mode 100644 index 000000000..59eb3415a --- /dev/null +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -0,0 +1,54 @@ +namespace KafkaFlow.Consumers +{ + using System; + using System.Threading; + using Confluent.Kafka; + + internal class ConsumerContext : IConsumerContext + { + private readonly IConsumer consumer; + private readonly IOffsetManager offsetManager; + private readonly ConsumeResult kafkaResult; + + public ConsumerContext( + IConsumer consumer, + IOffsetManager offsetManager, + ConsumeResult kafkaResult, + CancellationToken workerStopped, + int workerId) + { + this.WorkerStopped = workerStopped; + this.WorkerId = workerId; + this.consumer = consumer; + this.offsetManager = offsetManager; + this.kafkaResult = kafkaResult; + } + + public string ConsumerName => this.consumer.Configuration.ConsumerName; + + public CancellationToken WorkerStopped { get; } + + public int WorkerId { get; } + + public string Topic => this.kafkaResult.Topic; + + public int Partition => this.kafkaResult.Partition.Value; + + public long Offset => this.kafkaResult.Offset.Value; + + public string GroupId => this.consumer.Configuration.GroupId; + + public bool ShouldStoreOffset { get; set; } = true; + + public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime; + + public void StoreOffset() => this.offsetManager.StoreOffset(this.kafkaResult.TopicPartitionOffset); + + public IOffsetsWatermark GetOffsetsWatermark() => + new OffsetsWatermark(this.consumer.GetWatermarkOffsets(this.kafkaResult.TopicPartition)); + + public void Pause() => this.consumer.FlowManager.Pause(this.consumer.Assignment); + + public void Resume() => this.consumer.FlowManager.Resume(this.consumer.Assignment); + } +} diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index f7a3b8f32..24f03c922 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -58,15 +58,16 @@ public Task StartAsync() .ReadAsync(this.stopCancellationTokenSource.Token) .ConfigureAwait(false); - var context = new ConsumerMessageContext( - new MessageContextConsumer( + var context = new MessageContext( + new Message(message.Message.Key, message.Message.Value), + new MessageHeaders(message.Message.Headers), + new ConsumerContext( this.consumer, this.offsetManager, message, - this.stopCancellationTokenSource.Token), - message, - this.Id, - this.consumer.Configuration.GroupId); + this.stopCancellationTokenSource.Token, + this.Id), + null); try { @@ -82,14 +83,14 @@ await this.middlewareExecutor new { context.Message, - context.Topic, - context.PartitionKey, - ConsumerName = context.Consumer.Name, + context.ConsumerContext.Topic, + MessageKey = context.Message.Key, + context.ConsumerContext.ConsumerName, }); } finally { - if (this.consumer.Configuration.AutoStoreOffsets && context.Consumer.ShouldStoreOffset) + if (this.consumer.Configuration.AutoStoreOffsets && context.ConsumerContext.ShouldStoreOffset) { this.offsetManager.StoreOffset(message.TopicPartitionOffset); } diff --git a/src/KafkaFlow/Consumers/MessageContextConsumer.cs b/src/KafkaFlow/Consumers/MessageContextConsumer.cs deleted file mode 100644 index 916d3bfde..000000000 --- a/src/KafkaFlow/Consumers/MessageContextConsumer.cs +++ /dev/null @@ -1,53 +0,0 @@ -namespace KafkaFlow.Consumers -{ - using System; - using System.Threading; - using Confluent.Kafka; - - internal class MessageContextConsumer : IMessageContextConsumer - { - private readonly IConsumer consumer; - private readonly IOffsetManager offsetManager; - private readonly ConsumeResult kafkaResult; - - public MessageContextConsumer( - IConsumer consumer, - IOffsetManager offsetManager, - ConsumeResult kafkaResult, - CancellationToken workerStopped) - { - this.WorkerStopped = workerStopped; - this.consumer = consumer; - this.offsetManager = offsetManager; - this.kafkaResult = kafkaResult; - } - - public string Name => this.consumer.Configuration.ConsumerName; - - public CancellationToken WorkerStopped { get; } - - public bool ShouldStoreOffset { get; set; } = true; - - public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime; - - public void StoreOffset() - { - this.offsetManager.StoreOffset(this.kafkaResult.TopicPartitionOffset); - } - - public IOffsetsWatermark GetOffsetsWatermark() - { - return new OffsetsWatermark(this.consumer.GetWatermarkOffsets(this.kafkaResult.TopicPartition)); - } - - public void Pause() - { - this.consumer.FlowManager.Pause(this.consumer.Assignment); - } - - public void Resume() - { - this.consumer.FlowManager.Resume(this.consumer.Assignment); - } - } -} diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj index adf157aa6..191301286 100644 --- a/src/KafkaFlow/KafkaFlow.csproj +++ b/src/KafkaFlow/KafkaFlow.csproj @@ -7,12 +7,12 @@ - - + + - + diff --git a/src/KafkaFlow/MessageContext.cs b/src/KafkaFlow/MessageContext.cs new file mode 100644 index 000000000..860262102 --- /dev/null +++ b/src/KafkaFlow/MessageContext.cs @@ -0,0 +1,31 @@ +namespace KafkaFlow +{ + internal class MessageContext : IMessageContext + { + public MessageContext( + Message message, + IMessageHeaders headers, + IConsumerContext consumer, + IProducerContext producer) + { + this.Message = message; + this.Headers = headers ?? new MessageHeaders(); + this.ConsumerContext = consumer; + this.ProducerContext = producer; + } + + public Message Message { get; } + + public IConsumerContext ConsumerContext { get; } + + public IProducerContext ProducerContext { get; } + + public IMessageHeaders Headers { get; } + + public IMessageContext TransformMessage(object key, object value) => new MessageContext( + new Message(key, value), + this.Headers, + this.ConsumerContext, + this.ProducerContext); + } +} diff --git a/src/KafkaFlow/MiddlewareExecutor.cs b/src/KafkaFlow/MiddlewareExecutor.cs index 8cbcd9b85..aff62eea1 100644 --- a/src/KafkaFlow/MiddlewareExecutor.cs +++ b/src/KafkaFlow/MiddlewareExecutor.cs @@ -36,7 +36,7 @@ private Task ExecuteDefinition( context, nextContext => this.ExecuteDefinition( index + 1, - nextContext.Clone(), + nextContext, nextOperation)); } } diff --git a/src/KafkaFlow/ProducerMessageContext.cs b/src/KafkaFlow/ProducerMessageContext.cs deleted file mode 100644 index dfcc9fdd6..000000000 --- a/src/KafkaFlow/ProducerMessageContext.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace KafkaFlow -{ - internal class ProducerMessageContext : IMessageContext - { - public ProducerMessageContext( - object message, - byte[] partitionKey, - IMessageHeaders headers, - string topic) - { - this.Message = message; - this.PartitionKey = partitionKey; - this.Headers = headers ?? new MessageHeaders(); - this.Topic = topic; - this.Offset = null; - this.Partition = null; - } - - public int WorkerId => 0; - - public byte[] PartitionKey { get; } - - public object Message { get; private set; } - - public IMessageHeaders Headers { get; } - - public string Topic { get; } - - public string GroupId => null; - - public int? Partition { get; set; } - - public long? Offset { get; set; } - - public IMessageContextConsumer Consumer => null; - - public void TransformMessage(object message) - { - this.Message = message; - } - - public IMessageContext Clone() => (IMessageContext) this.MemberwiseClone(); - } -} diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 86a950ef5..2b233a16f 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -47,15 +47,15 @@ public async Task> ProduceAsync( await this.middlewareExecutor .Execute( - new ProducerMessageContext( - message, - messageKey, + new MessageContext( + new Message(messageKey, message), headers, - topic), + null, + new ProducerContext(topic)), async context => { report = await this - .InternalProduceAsync((ProducerMessageContext) context) + .InternalProduceAsync(context) .ConfigureAwait(false); }) .ConfigureAwait(false); @@ -91,17 +91,17 @@ public void Produce( var messageKey = partitionKey is null ? null : Encoding.UTF8.GetBytes(partitionKey); this.middlewareExecutor.Execute( - new ProducerMessageContext( - message, - messageKey, + new MessageContext( + new Message(messageKey, message), headers, - topic), + null, + new ProducerContext(topic)), context => { var completionSource = new TaskCompletionSource(); this.InternalProduce( - (ProducerMessageContext) context, + context, report => { if (report.Error.IsError) @@ -146,27 +146,37 @@ public void Dispose() this.producer?.Dispose(); } - private static Message CreateMessage(IMessageContext context) + private static void FillContextWithResultMetadata(IMessageContext context, DeliveryResult result) { - return new() - { - Key = context.PartitionKey, - Value = GetMessageContent(context), - Headers = ((MessageHeaders) context.Headers).GetKafkaHeaders(), - Timestamp = Timestamp.Default, - }; + var concreteProducerContext = (ProducerContext) context.ProducerContext; + + concreteProducerContext.Offset = result.Offset; + concreteProducerContext.Partition = result.Partition; } - private static byte[] GetMessageContent(IMessageContext context) + private static Message CreateMessage(IMessageContext context) { - if (!(context.Message is byte[] value)) + if (!(context.Message.Value is byte[] value)) + { + throw new InvalidOperationException( + $"The message value must be a byte array to be produced, it is a {context.Message.Value.GetType().FullName}." + + "You should serialize or encode your message object using a middleware"); + } + + if (!(context.Message.Key is byte[] key)) { throw new InvalidOperationException( - $"{nameof(context.Message)} must be a byte array to be produced, it is a {context.Message.GetType().FullName}." + + $"The message key must be a byte array to be produced, it is a {context.Message.Key.GetType().FullName}." + "You should serialize or encode your message object using a middleware"); } - return value; + return new() + { + Key = key, + Value = value, + Headers = ((MessageHeaders) context.Headers).GetKafkaHeaders(), + Timestamp = Timestamp.Default, + }; } private IProducer EnsureProducer() @@ -228,7 +238,7 @@ private void InvalidateProducer(Error error, DeliveryResult resu new { Error = error }); } - private async Task> InternalProduceAsync(ProducerMessageContext context) + private async Task> InternalProduceAsync(IMessageContext context) { DeliveryResult result = null; @@ -237,7 +247,7 @@ private async Task> InternalProduceAsync(Producer result = await this .EnsureProducer() .ProduceAsync( - context.Topic, + context.ProducerContext.Topic, CreateMessage(context)) .ConfigureAwait(false); } @@ -251,20 +261,19 @@ private async Task> InternalProduceAsync(Producer throw; } - context.Offset = result.Offset; - context.Partition = result.Partition; + FillContextWithResultMetadata(context, result); return result; } private void InternalProduce( - ProducerMessageContext context, + IMessageContext context, Action> deliveryHandler) { this .EnsureProducer() .Produce( - context.Topic, + context.ProducerContext.Topic, CreateMessage(context), report => { @@ -273,8 +282,7 @@ private void InternalProduce( this.InvalidateProducer(report.Error, report); } - context.Offset = report.Offset; - context.Partition = report.Partition; + FillContextWithResultMetadata(context, report); deliveryHandler(report); }); diff --git a/src/KafkaFlow/Producers/ProducerContext.cs b/src/KafkaFlow/Producers/ProducerContext.cs new file mode 100644 index 000000000..b42ffceea --- /dev/null +++ b/src/KafkaFlow/Producers/ProducerContext.cs @@ -0,0 +1,16 @@ +namespace KafkaFlow.Producers +{ + internal class ProducerContext : IProducerContext + { + public ProducerContext(string topic) + { + this.Topic = topic; + } + + public string Topic { get; } + + public int? Partition { get; set; } + + public long? Offset { get; set; } + } +} From ae9ebedf1a1deab6e6b4015cad6733d12e860473 Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Wed, 28 Apr 2021 11:38:01 +0100 Subject: [PATCH 2/5] test: change test classes to be public --- src/KafkaFlow.IntegrationTests/CompressionSerializationTest.cs | 2 +- src/KafkaFlow.IntegrationTests/CompressionTest.cs | 2 +- src/KafkaFlow.IntegrationTests/ConsumerTest.cs | 2 +- src/KafkaFlow.IntegrationTests/SerializationTest.cs | 2 +- .../BatchConsume/BatchConsumeMiddlewareTests.cs | 2 +- src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs | 2 +- .../Compressors/CompressorConsumerMiddlewareTests.cs | 2 +- .../Compressors/CompressorProducerMiddlewareTests.cs | 2 +- .../ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs | 2 +- .../ConfigurationBuilders/ProducerConfigurationBuilderTests.cs | 2 +- src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs | 2 +- src/KafkaFlow.UnitTests/Consumer/WorkerPoolFeederTests.cs | 2 +- src/KafkaFlow.UnitTests/ConsumerContextTests.cs | 2 +- src/KafkaFlow.UnitTests/MessageHeadersTests.cs | 2 +- src/KafkaFlow.UnitTests/OffsetManagerTests.cs | 2 +- src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs | 2 +- .../Serializers/SerializerConsumerMiddlewareTests.cs | 2 +- .../Serializers/SerializerProducerMiddlewareTests.cs | 2 +- src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs | 2 +- 19 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/KafkaFlow.IntegrationTests/CompressionSerializationTest.cs b/src/KafkaFlow.IntegrationTests/CompressionSerializationTest.cs index f90ad5a2e..4358ebe05 100644 --- a/src/KafkaFlow.IntegrationTests/CompressionSerializationTest.cs +++ b/src/KafkaFlow.IntegrationTests/CompressionSerializationTest.cs @@ -13,7 +13,7 @@ namespace KafkaFlow.IntegrationTests using KafkaFlow.Producers; [TestClass] - internal class CompressionSerializationTest + public class CompressionSerializationTest { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.IntegrationTests/CompressionTest.cs b/src/KafkaFlow.IntegrationTests/CompressionTest.cs index 66ab0beb9..6e4898344 100644 --- a/src/KafkaFlow.IntegrationTests/CompressionTest.cs +++ b/src/KafkaFlow.IntegrationTests/CompressionTest.cs @@ -12,7 +12,7 @@ namespace KafkaFlow.IntegrationTests using KafkaFlow.Producers; [TestClass] - internal class CompressionTest + public class CompressionTest { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs index 868e6d49c..e2370565d 100644 --- a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -13,7 +13,7 @@ namespace KafkaFlow.IntegrationTests using KafkaFlow.Producers; [TestClass] - internal class ConsumerTest + public class ConsumerTest { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.IntegrationTests/SerializationTest.cs b/src/KafkaFlow.IntegrationTests/SerializationTest.cs index 9bfbfbf11..a2bfdc06c 100644 --- a/src/KafkaFlow.IntegrationTests/SerializationTest.cs +++ b/src/KafkaFlow.IntegrationTests/SerializationTest.cs @@ -14,7 +14,7 @@ namespace KafkaFlow.IntegrationTests using MessageTypes; [TestClass] - internal class SerializationTest + public class SerializationTest { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index a7a980a5c..bac692e73 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.UnitTests.BatchConsume using Moq; [TestClass] - internal class BatchConsumeMiddlewareTests + public class BatchConsumeMiddlewareTests { private const int BatchSize = 10; private readonly TimeSpan batchTimeout = TimeSpan.FromSeconds(3); diff --git a/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs index a1c84c452..e33e88d3e 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/WorkerBatchTests.cs @@ -8,7 +8,7 @@ namespace KafkaFlow.UnitTests.BatchConsume using Moq; [TestClass] - internal class WorkerBatchTests + public class WorkerBatchTests { private const int BatchSize = 3; diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs index 44f2d5016..09e16a5ed 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs @@ -8,7 +8,7 @@ namespace KafkaFlow.UnitTests.Compressors using Moq; [TestClass] - internal class CompressorConsumerMiddlewareTests + public class CompressorConsumerMiddlewareTests { private Mock contextMock; private Mock compressorMock; diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs index 8f5b1d98c..a08ca7834 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs @@ -8,7 +8,7 @@ namespace KafkaFlow.UnitTests.Compressors using Moq; [TestClass] - internal class CompressorProducerMiddlewareTests + public class CompressorProducerMiddlewareTests { private Mock contextMock; private Mock compressorMock; diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs index 041ee0858..b70dc3b2a 100644 --- a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs +++ b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs @@ -10,7 +10,7 @@ namespace KafkaFlow.UnitTests.ConfigurationBuilders using AutoOffsetReset = KafkaFlow.AutoOffsetReset; [TestClass] - internal class ConsumerConfigurationBuilderTests + public class ConsumerConfigurationBuilderTests { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ProducerConfigurationBuilderTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ProducerConfigurationBuilderTests.cs index 04dd19e8d..a23767619 100644 --- a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ProducerConfigurationBuilderTests.cs +++ b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ProducerConfigurationBuilderTests.cs @@ -9,7 +9,7 @@ namespace KafkaFlow.UnitTests.ConfigurationBuilders using Moq; [TestClass] - internal class ProducerConfigurationBuilderTests + public class ProducerConfigurationBuilderTests { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs b/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs index 65f1ac69e..d08aa60c9 100644 --- a/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs +++ b/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs @@ -12,7 +12,7 @@ namespace KafkaFlow.UnitTests.Consumer using Moq; [TestClass] - internal class ConsumerManagerTests + public class ConsumerManagerTests { private readonly Fixture fixture = new(); diff --git a/src/KafkaFlow.UnitTests/Consumer/WorkerPoolFeederTests.cs b/src/KafkaFlow.UnitTests/Consumer/WorkerPoolFeederTests.cs index 84df8ca78..a1930c73e 100644 --- a/src/KafkaFlow.UnitTests/Consumer/WorkerPoolFeederTests.cs +++ b/src/KafkaFlow.UnitTests/Consumer/WorkerPoolFeederTests.cs @@ -9,7 +9,7 @@ namespace KafkaFlow.UnitTests.Consumer using Moq; [TestClass] - internal class WorkerPoolFeederTests + public class WorkerPoolFeederTests { private WorkerPoolFeeder target; diff --git a/src/KafkaFlow.UnitTests/ConsumerContextTests.cs b/src/KafkaFlow.UnitTests/ConsumerContextTests.cs index f0844f7bd..259dea856 100644 --- a/src/KafkaFlow.UnitTests/ConsumerContextTests.cs +++ b/src/KafkaFlow.UnitTests/ConsumerContextTests.cs @@ -8,7 +8,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] - internal class ConsumerContextTests + public class ConsumerContextTests { [TestMethod] public void MessageTimestamp_ConsumeResultHasMessageTimestamp_ReturnsMessageTimestampFromResult() diff --git a/src/KafkaFlow.UnitTests/MessageHeadersTests.cs b/src/KafkaFlow.UnitTests/MessageHeadersTests.cs index 8cea40c70..ed89bddbd 100644 --- a/src/KafkaFlow.UnitTests/MessageHeadersTests.cs +++ b/src/KafkaFlow.UnitTests/MessageHeadersTests.cs @@ -6,7 +6,7 @@ namespace KafkaFlow.UnitTests using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] - internal class MessageHeadersTests + public class MessageHeadersTests { private const string Key = "abc"; private const string StrValue = "123"; diff --git a/src/KafkaFlow.UnitTests/OffsetManagerTests.cs b/src/KafkaFlow.UnitTests/OffsetManagerTests.cs index 78939e637..a2056740b 100644 --- a/src/KafkaFlow.UnitTests/OffsetManagerTests.cs +++ b/src/KafkaFlow.UnitTests/OffsetManagerTests.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.UnitTests using Moq; [TestClass] - internal class OffsetManagerTests + public class OffsetManagerTests { private Mock committerMock; private TopicPartition topicPartition; diff --git a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs index d001b2d47..47c54f7a9 100644 --- a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs +++ b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs @@ -8,7 +8,7 @@ namespace KafkaFlow.UnitTests using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] - internal class PartitionOffsetsTests + public class PartitionOffsetsTests { [TestMethod] public void AddOffset_InitializeTheValue_DoNothing() diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index c3435a383..dd2372686 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.UnitTests.Serializers using Moq; [TestClass] - internal class SerializerConsumerMiddlewareTests + public class SerializerConsumerMiddlewareTests { private Mock contextMock; private Mock serializerMock; diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index f2c167b53..2e4dfffd8 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -6,7 +6,7 @@ namespace KafkaFlow.UnitTests.Serializers using Moq; [TestClass] - internal class SerializerProducerMiddlewareTests + public class SerializerProducerMiddlewareTests { private Mock contextMock; private Mock serializerMock; diff --git a/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs b/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs index b3e6028fe..48f4ba6ca 100644 --- a/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs +++ b/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs @@ -5,7 +5,7 @@ namespace KafkaFlow.UnitTests.TypedHandler using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] - internal class HandlerTypeMappingTests + public class HandlerTypeMappingTests { private HandlerTypeMapping target; From eb383aba394cba488642a8bef077ca5d06011d8c Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Wed, 28 Apr 2021 11:38:34 +0100 Subject: [PATCH 3/5] style: ignore documentation Warnings to test and sample projects --- .../KafkaFlow.Sample.Avro.csproj | 22 +++++++++++------ .../KafkaFlow.Sample.BatchConsume.csproj | 8 +++++++ .../KafkaFlow.Sample.WebApi.csproj | 24 ++++++++++++------- .../KafkaFlow.Sample/KafkaFlow.Sample.csproj | 8 +++++++ .../KafkaFlow.IntegrationTests.csproj | 8 +++++++ .../KafkaFlow.UnitTests.csproj | 8 +++++++ 6 files changed, 63 insertions(+), 15 deletions(-) diff --git a/samples/KafkaFlow.Sample.Avro/KafkaFlow.Sample.Avro.csproj b/samples/KafkaFlow.Sample.Avro/KafkaFlow.Sample.Avro.csproj index ffb654cd7..341d48967 100644 --- a/samples/KafkaFlow.Sample.Avro/KafkaFlow.Sample.Avro.csproj +++ b/samples/KafkaFlow.Sample.Avro/KafkaFlow.Sample.Avro.csproj @@ -7,17 +7,25 @@ false + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + - - - - - - + + + + + + - + diff --git a/samples/KafkaFlow.Sample.BatchConsume/KafkaFlow.Sample.BatchConsume.csproj b/samples/KafkaFlow.Sample.BatchConsume/KafkaFlow.Sample.BatchConsume.csproj index 431adbb5d..dd50eb76c 100644 --- a/samples/KafkaFlow.Sample.BatchConsume/KafkaFlow.Sample.BatchConsume.csproj +++ b/samples/KafkaFlow.Sample.BatchConsume/KafkaFlow.Sample.BatchConsume.csproj @@ -7,6 +7,14 @@ false + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + diff --git a/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj b/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj index f0d2f2c58..a825f9903 100644 --- a/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj +++ b/samples/KafkaFlow.Sample.WebApi/KafkaFlow.Sample.WebApi.csproj @@ -5,18 +5,26 @@ false + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + - - - - - + + + + + - - - + + + diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index 2ae27c1c2..386681126 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -7,6 +7,14 @@ false + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + diff --git a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj index 09eb74539..05288c24b 100644 --- a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj +++ b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj @@ -5,6 +5,14 @@ false + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index dcfb8cd64..281244db0 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -5,6 +5,14 @@ false + + 1701;1702;CS1591;SA1600 + + + + 1701;1702;CS1591;SA1600 + + From 86f8ea29cd05f32915fbc485f7c2d8dcff09a738 Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Wed, 28 Apr 2021 13:58:48 +0100 Subject: [PATCH 4/5] chore: change commit lint rules --- commitlint.config.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/commitlint.config.js b/commitlint.config.js index 98ee7dfc2..e9e868268 100644 --- a/commitlint.config.js +++ b/commitlint.config.js @@ -1,3 +1,7 @@ -module.exports = { +const Configuration = { extends: ['@commitlint/config-conventional'], -} + rules: { + 'body-max-line-length': [0, 'always'], + }, +}; +module.exports = Configuration; \ No newline at end of file From a93aa26293806b40ec2dfa62c09a2ebf3bb5d1a0 Mon Sep 17 00:00:00 2001 From: Filipe Esch Date: Wed, 28 Apr 2021 15:41:24 +0100 Subject: [PATCH 5/5] refactor: change Confluent serializers to not use dynamic --- .../ConfluentDeserializerWrapper.cs | 56 +++++++++++++++++++ .../ConfluentSerializerWrapper.cs | 56 +++++++++++++++++++ .../KafkaFlow.SchemaRegistry.csproj | 4 +- .../ConfluentAvroSerializer.cs | 48 +++++++--------- .../ConfluentJsonSerializer.cs | 41 ++++++-------- .../ConfluentProtobufSerializer.cs | 37 ++++++------ 6 files changed, 169 insertions(+), 73 deletions(-) create mode 100644 src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs create mode 100644 src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs diff --git a/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs b/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs new file mode 100644 index 000000000..2bb4f52b2 --- /dev/null +++ b/src/KafkaFlow.SchemaRegistry/ConfluentDeserializerWrapper.cs @@ -0,0 +1,56 @@ +namespace KafkaFlow +{ + using System; + using System.Collections.Concurrent; + using Confluent.Kafka; + + /// + /// A wrapper to call the typed Confluent deserializers + /// + public abstract class ConfluentDeserializerWrapper + { + private static readonly ConcurrentDictionary Deserializers = new(); + + /// + /// Get the deserializer based on the target message type + /// + /// The message type + /// A factory that creates a + /// + public static ConfluentDeserializerWrapper GetOrCreateDeserializer( + Type messageType, + Func deserializerFactory) + { + return Deserializers.GetOrAdd( + messageType, + _ => (ConfluentDeserializerWrapper) Activator.CreateInstance( + typeof(InnerConfluentDeserializerWrapper<>).MakeGenericType(messageType), + deserializerFactory)); + } + + /// + /// Deserialize a message using the passed deserializer + /// + /// The message to deserialize + /// + public abstract object Deserialize(byte[] message); + + private class InnerConfluentDeserializerWrapper : ConfluentDeserializerWrapper + { + private readonly IAsyncDeserializer deserializer; + + public InnerConfluentDeserializerWrapper(Func deserializerFactory) + { + this.deserializer = (IAsyncDeserializer) deserializerFactory(); + } + + public override object Deserialize(byte[] message) + { + return this.deserializer + .DeserializeAsync(message, message == null, SerializationContext.Empty) + .GetAwaiter() + .GetResult(); + } + } + } +} diff --git a/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs b/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs new file mode 100644 index 000000000..7b529b820 --- /dev/null +++ b/src/KafkaFlow.SchemaRegistry/ConfluentSerializerWrapper.cs @@ -0,0 +1,56 @@ +namespace KafkaFlow +{ + using System; + using System.Collections.Concurrent; + using Confluent.Kafka; + + /// + /// A wrapper to call the typed Confluent serializers and deserializers + /// + public abstract class ConfluentSerializerWrapper + { + private static readonly ConcurrentDictionary Serializers = new(); + + /// + /// Get the serializer based on the target message type + /// + /// The message type + /// A factory that creates a + /// + public static ConfluentSerializerWrapper GetOrCreateSerializer( + Type messageType, + Func serializerFactory) + { + return Serializers.GetOrAdd( + messageType, + _ => (ConfluentSerializerWrapper) Activator.CreateInstance( + typeof(InnerConfluentSerializerWrapper<>).MakeGenericType(messageType), + serializerFactory)); + } + + /// + /// Serialize a message using the passed serializer + /// + /// The message to serialize + /// + public abstract byte[] Serialize(object message); + + private class InnerConfluentSerializerWrapper : ConfluentSerializerWrapper + { + private readonly IAsyncSerializer serializer; + + public InnerConfluentSerializerWrapper(Func serializerFactory) + { + this.serializer = (IAsyncSerializer) serializerFactory(); + } + + public override byte[] Serialize(object message) + { + return this.serializer + .SerializeAsync((T) message, SerializationContext.Empty) + .GetAwaiter() + .GetResult(); + } + } + } +} diff --git a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj index bedd256e4..54b9b5d04 100644 --- a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj +++ b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj @@ -8,11 +8,11 @@ - + - + diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs index 5e133c8aa..383b7b6b5 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs @@ -1,9 +1,6 @@ namespace KafkaFlow.Serializer.SchemaRegistry { using System; - using Avro.Specific; - using Confluent.Kafka; - using Confluent.Kafka.SyncOverAsync; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; @@ -33,9 +30,10 @@ public ConfluentAvroSerializer( IDependencyResolver resolver, AvroSerializerConfig serializerConfig) { - this.schemaRegistryClient = resolver.Resolve() ?? - throw new InvalidOperationException( - $"No schema registry configuration was found. Set it using {nameof(ClusterConfigurationBuilderExtensions.WithSchemaRegistry)} on cluster configuration"); + this.schemaRegistryClient = + resolver.Resolve() ?? + throw new InvalidOperationException( + $"No schema registry configuration was found. Set it using {nameof(ClusterConfigurationBuilderExtensions.WithSchemaRegistry)} on cluster configuration"); this.serializerConfig = serializerConfig; } @@ -43,32 +41,28 @@ public ConfluentAvroSerializer( /// public byte[] Serialize(object message) { - if (!(message is ISpecificRecord record)) - { - throw new InvalidCastException( - $"The message type {message.GetType().FullName} must implement {nameof(ISpecificRecord)} interface."); - } - - return new AvroSerializer( - this.schemaRegistryClient, - this.serializerConfig) - .AsSyncOverAsync() - .Serialize(record, SerializationContext.Empty); + return ConfluentSerializerWrapper + .GetOrCreateSerializer( + message.GetType(), + () => Activator.CreateInstance( + typeof(AvroSerializer<>).MakeGenericType(message.GetType()), + this.schemaRegistryClient, + this.serializerConfig)) + .Serialize(message); } /// public object Deserialize(byte[] data, Type type) { - dynamic deserializer = Activator - .CreateInstance( - typeof(AvroDeserializer<>).MakeGenericType(type), - this.schemaRegistryClient, - new AvroDeserializerConfig()); - - return deserializer - .DeserializeAsync(data, data == null, SerializationContext.Empty) - .GetAwaiter() - .GetResult(); + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(AvroDeserializer<>).MakeGenericType(type), + this.schemaRegistryClient, + new AvroDeserializerConfig())) + .Deserialize(data); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs index 8d1eab7d2..b19cb1ee4 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Linq; - using Confluent.Kafka; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; using NJsonSchema.Generation; @@ -59,33 +58,29 @@ public ConfluentJsonSerializer( /// public byte[] Serialize(object message) { - dynamic serializer = Activator.CreateInstance( - typeof(JsonSerializer<>).MakeGenericType(message.GetType()), - this.schemaRegistryClient, - this.serializerConfig, - this.schemaGeneratorSettings); - - return serializer - .SerializeAsync(message as dynamic, SerializationContext.Empty) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); + return ConfluentSerializerWrapper + .GetOrCreateSerializer( + message.GetType(), + () => Activator.CreateInstance( + typeof(JsonSerializer<>).MakeGenericType(message.GetType()), + this.schemaRegistryClient, + this.serializerConfig, + this.schemaGeneratorSettings)) + .Serialize(message); } /// public object Deserialize(byte[] message, Type type) { - dynamic deserializer = Activator - .CreateInstance( - typeof(JsonDeserializer<>).MakeGenericType(type), - Enumerable.Empty>(), - null); - - return deserializer - .DeserializeAsync(message, message == null, SerializationContext.Empty) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(JsonDeserializer<>).MakeGenericType(type), + Enumerable.Empty>(), + null)) + .Deserialize(message); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs index d71e7fb0e..7f6f5b309 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; using System.Linq; - using Confluent.Kafka; using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; @@ -42,31 +41,27 @@ public ConfluentProtobufSerializer(IDependencyResolver resolver, ProtobufSeriali /// public byte[] Serialize(object message) { - dynamic serializer = Activator.CreateInstance( - typeof(ProtobufSerializer<>).MakeGenericType(message.GetType()), - this.schemaRegistryClient, - this.serializerConfig); - - return serializer - .SerializeAsync(message as dynamic, SerializationContext.Empty) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); + return ConfluentSerializerWrapper + .GetOrCreateSerializer( + message.GetType(), + () => Activator.CreateInstance( + typeof(ProtobufSerializer<>).MakeGenericType(message.GetType()), + this.schemaRegistryClient, + this.serializerConfig)) + .Serialize(message); } /// public object Deserialize(byte[] message, Type type) { - dynamic deserializer = Activator - .CreateInstance( - typeof(ProtobufDeserializer<>).MakeGenericType(type), - Enumerable.Empty>()); - - return deserializer - .DeserializeAsync(message, message == null, SerializationContext.Empty) - .ConfigureAwait(false) - .GetAwaiter() - .GetResult(); + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(ProtobufDeserializer<>).MakeGenericType(type), + Enumerable.Empty>())) + .Deserialize(message); } } }