Skip to content

Commit

Permalink
[Host] Batch produce API
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Nov 7, 2024
1 parent ee36ea4 commit dca4345
Show file tree
Hide file tree
Showing 56 changed files with 898 additions and 729 deletions.
25 changes: 25 additions & 0 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- [Configuration](#configuration)
- [Pub/Sub communication](#pubsub-communication)
- [Producer](#producer)
- [Bulk (Batch) Publish](#bulk-batch-publish)
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
Expand Down Expand Up @@ -159,6 +160,30 @@ await bus.Publish(msg, cancellationToken: ct);

> The transport plugins might introduce additional configuration options. Please check the relevant provider docs. For example, Azure Service Bus, Azure Event Hub and Kafka allow setting the partitioning key for a given message type.
#### Bulk (Batch) Publish

Several transports support publishing messages in bulk, including:

- **Azure Service Bus**
- **Azure Event Hub**
- **RabbitMQ**

To publish messages in bulk, pass a collection of message instances of the specified type, as shown below:

```csharp
// Assuming IMessageBus bus;
IEnumerable<SomeMessage> messages = [ ];
await bus.Publish(messages);
```

Any collection type that can be converted to `IEnumerable<T>` is supported.

While there’s no upper limit enforced by SMB, be aware that the underlying transport may split messages into chunks to avoid exceeding its payload limits in a single publish operation. These chunks will retain the original message order.

For transports that don’t natively support bulk publishing, messages are published individually in sequence.

> **Note:** The [producer interceptor](#interceptors) pipeline is not executed during bulk publishing. This behavior may change in future updates.
#### Set message headers

> Since version 1.15.0
Expand Down
25 changes: 25 additions & 0 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- [Configuration](#configuration)
- [Pub/Sub communication](#pubsub-communication)
- [Producer](#producer)
- [Bulk (Batch) Publish](#bulk-batch-publish)
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
Expand Down Expand Up @@ -159,6 +160,30 @@ await bus.Publish(msg, cancellationToken: ct);

> The transport plugins might introduce additional configuration options. Please check the relevant provider docs. For example, Azure Service Bus, Azure Event Hub and Kafka allow setting the partitioning key for a given message type.
#### Bulk (Batch) Publish

Several transports support publishing messages in bulk, including:

- **Azure Service Bus**
- **Azure Event Hub**
- **RabbitMQ**

To publish messages in bulk, pass a collection of message instances of the specified type, as shown below:

```csharp
// Assuming IMessageBus bus;
IEnumerable<SomeMessage> messages = [ ];
await bus.Publish(messages);
```

Any collection type that can be converted to `IEnumerable<T>` is supported.

While there’s no upper limit enforced by SMB, be aware that the underlying transport may split messages into chunks to avoid exceeding its payload limits in a single publish operation. These chunks will retain the original message order.

For transports that don’t natively support bulk publishing, messages are published individually in sequence.

> **Note:** The [producer interceptor](#interceptors) pipeline is not executed during bulk publishing. This behavior may change in future updates.
#### Set message headers

> Since version 1.15.0
Expand Down
97 changes: 53 additions & 44 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,99 +104,108 @@ protected override async Task OnStart()
}
}

protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
private EventData GetTransportMessage(object message, Type messageType, IDictionary<string, object> messageHeaders, string path, out string partitionKey)
{
OnProduceToTransport(message, messageType, path, messageHeaders);

var messagePayload = message != null
? Serializer.Serialize(messageType, message)
: null;

var transportMessage = message != null
? new EventData(messagePayload)
: new EventData();

if (messageHeaders != null)
{
foreach (var header in messageHeaders)
{
transportMessage.Properties.Add(header.Key, header.Value);
}
}

partitionKey = messageType != null
? GetPartitionKey(messageType, message)
: null;

return transportMessage;
}

public override Task ProduceToTransport(object message, Type messageType, string path, IDictionary<string, object> messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
var transportMessage = GetTransportMessage(message, messageType, messageHeaders, path, out var partitionKey);
var producer = _producerByPath[path];
return producer.SendAsync([transportMessage], new SendEventOptions { PartitionKey = partitionKey }, cancellationToken);
}

public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
AssertActive();

var dispatched = new List<T>(envelopes.Count);
try
{
var producer = _producerByPath[path];

var messagesByPartition = envelopes
.Where(x => x.Message != null)
.Select(envelope =>
{
var messageType = envelope.Message?.GetType();
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
_logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);
var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();
if (envelope.Headers != null)
{
foreach (var header in envelope.Headers)
{
ev.Properties.Add(header.Key, header.Value);
}
}
var partitionKey = messageType != null
? GetPartitionKey(messageType, envelope.Message)
: null;
return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
var transportMessage = GetTransportMessage(envelope.Message, envelope.MessageType, envelope.Headers, path, out var partitionKey);
return (Envelope: envelope, TransportMessage: transportMessage, PartitionKey: partitionKey);
})
.GroupBy(x => x.PartitionKey);

var producer = _producerByPath[path];
var inBatch = new List<T>(envelopes.Count);
foreach (var partition in messagesByPartition)
{
var batchOptions = new CreateBatchOptions { PartitionKey = partition.Key };
EventDataBatch batch = null;
try
{
var items = partition.ToList();
if (items.Count == 1)
using var it = partition.GetEnumerator();
var advance = it.MoveNext();
while (advance)
{
// only one item - quicker to send on its own
var item = items.Single();
await producer.SendAsync([item.Message], new SendEventOptions { PartitionKey = partition.Key }, cancellationToken);

dispatched.Add(item.Envelope);
continue;
}
var item = it.Current;

// multiple items - send in batches
var inBatch = new List<T>(items.Count);
var i = 0;
while (i < items.Count)
{
var item = items[i];
batch ??= await producer.CreateBatchAsync(new CreateBatchOptions { PartitionKey = partition.Key }, cancellationToken);
if (batch.TryAdd(item.Message))
batch ??= await producer.CreateBatchAsync(batchOptions, cancellationToken);
if (batch.TryAdd(item.TransportMessage))
{
inBatch.Add(item.Envelope);
if (++i < items.Count)
advance = it.MoveNext();
if (advance)
{
continue;
}
}

if (batch.Count == 0)
{
throw new ProducerMessageBusException($"Failed to add message {item.Envelope.Message} of Type {item.Envelope.MessageType?.Name} on Path {path} to an empty batch");
throw new ProducerMessageBusException($"Failed to add message {item.Envelope.Message} of type {item.Envelope.MessageType?.Name} on path {path} to an empty batch");
}

advance = false;
await producer.SendAsync(batch, cancellationToken).ConfigureAwait(false);
dispatched.AddRange(inBatch);
inBatch.Clear();

batch.Dispose();
batch = null;
}

return new(dispatched, null);
}
finally
{
batch?.Dispose();
}
}
return new(dispatched, null);
}
catch (Exception ex)
{
return new(dispatched, ex);
}

return new(dispatched, null);
}

#endregion
Expand Down
Loading

0 comments on commit dca4345

Please sign in to comment.