diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index c4f50eb1e..0eba9d693 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Sample { using System; + using System.Linq; using System.Threading.Tasks; using global::Microsoft.Extensions.DependencyInjection; using KafkaFlow.Admin; @@ -81,13 +82,17 @@ private static async Task Main() switch (input) { case var _ when int.TryParse(input, out var count): - for (var i = 0; i < count; i++) - { - producers[producerName] - .Produce( - Guid.NewGuid().ToString(), - new TestMessage { Text = $"Message: {Guid.NewGuid()}" }); - } + var result = await producers[producerName] + .BatchProduceAsync( + Enumerable + .Range(0, count) + .Select( + x => new BatchProduceItem( + "test-topic", + Guid.NewGuid().ToString(), + new TestMessage { Text = $"Message: {Guid.NewGuid()}" }, + null)) + .ToList()); break; diff --git a/src/KafkaFlow/Producers/BatchProduceExtension.cs b/src/KafkaFlow/Producers/BatchProduceExtension.cs new file mode 100644 index 000000000..7813c1ecb --- /dev/null +++ b/src/KafkaFlow/Producers/BatchProduceExtension.cs @@ -0,0 +1,130 @@ +namespace KafkaFlow.Producers +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Confluent.Kafka; + + /// + /// + public static class BatchProduceExtension + { + /// + /// Calls the Produce() method in loop for high throughput scenarios + /// + /// + /// All messages to produce + /// indicates if the method should throw a if any message fail + /// A Task that will be marked as completed when all produce operations end + public static Task> BatchProduceAsync( + this IMessageProducer producer, + IReadOnlyCollection items, + bool throwIfAnyProduceFail = true) + { + var completionSource = new TaskCompletionSource>(); + + var pendingProduceCount = items.Count; + var hasErrors = false; + + if (pendingProduceCount == 0) + { + completionSource.SetResult(items); + } + + foreach (var item in items) + { + producer.Produce( + item.Topic, + item.PartitionKey, + item.Message, + item.Headers, + report => + { + item.DeliveryReport = report; + + if (report.Error.IsError) + { + hasErrors = true; + } + + if (Interlocked.Decrement(ref pendingProduceCount) != 0) + { + return; + } + + if (throwIfAnyProduceFail && hasErrors) + { + completionSource.SetException(new BatchProduceException(items)); + } + else + { + completionSource.SetResult(items); + } + }); + } + + return completionSource.Task; + } + } + + /// + /// Represents a message to be produced in batch + /// + public class BatchProduceItem + { + /// + /// The message topic name + /// + public string Topic { get; } + + /// + /// The message partition key + /// + public string PartitionKey { get; } + + /// + /// The message object + /// + public object Message { get; } + + /// + /// The message headers + /// + public IMessageHeaders Headers { get; } + + /// + /// The delivery report after the production + /// + public DeliveryReport DeliveryReport { get; internal set; } + + /// + /// Creates a batch produce item + /// + /// + /// + /// + /// + public BatchProduceItem( + string topic, + string partitionKey, + object message, + IMessageHeaders headers) + { + this.Topic = topic; + this.PartitionKey = partitionKey; + this.Message = message; + this.Headers = headers; + } + } + + public class BatchProduceException : Exception + { + public IReadOnlyCollection Items { get; } + + public BatchProduceException(IReadOnlyCollection items) + { + this.Items = items; + } + } +}