Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/Wolverine/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,34 @@ public void SetMessageType(Type messageType)
/// </summary>
public string? TenantId { get; set; }

private string?[] _acceptedContentTypes = ["application/json"];
private string? _acceptedContentTypesJoined;

/// <summary>
/// Specifies the accepted content types for the requested reply
/// </summary>
public string?[] AcceptedContentTypes { get; set; } = ["application/json"];
public string?[] AcceptedContentTypes
{
get => _acceptedContentTypes;
set
{
_acceptedContentTypes = value;
_acceptedContentTypesJoined = null; // Invalidate cache
}
}

/// <summary>
/// Returns the AcceptedContentTypes as a comma-separated string.
/// This value is cached to avoid repeated allocations during serialization.
/// </summary>
internal string? AcceptedContentTypesJoined
{
get
{
if (_acceptedContentTypes.Length == 0) return null;
return _acceptedContentTypesJoined ??= string.Join(",", _acceptedContentTypes);
}
}

/// <summary>
/// Specific message id for this envelope
Expand Down
133 changes: 104 additions & 29 deletions src/Wolverine/Runtime/Serialization/EnvelopeSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,41 @@
using System.Buffers;
using System.Globalization;
using System.Xml;

namespace Wolverine.Runtime.Serialization;

public static class EnvelopeSerializer
{
// Initial buffer size - will grow as needed
private const int InitialBufferSize = 4096;

// Thread-local buffer to avoid repeated rentals in tight loops
[ThreadStatic]
private static byte[]? t_buffer;

private static byte[] RentBuffer(int minimumSize)
{
var buffer = t_buffer;
if (buffer != null && buffer.Length >= minimumSize)
{
t_buffer = null;
return buffer;
}
return ArrayPool<byte>.Shared.Rent(Math.Max(minimumSize, InitialBufferSize));
}

private static void ReturnBuffer(byte[] buffer)
{
if (buffer.Length <= InitialBufferSize * 4)
{
t_buffer = buffer;
}
else
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void ReadDataElement(Envelope env, string key, string value)
{
try
Expand Down Expand Up @@ -203,42 +234,89 @@ private static Envelope readSingle(BinaryReader br)

public static byte[] Serialize(IList<Envelope> messages)
{
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writer.Write(messages.Count);
foreach (var message in messages) writeSingle(writer, message);
writer.Flush();
return stream.ToArray();
// Estimate size: 4 bytes for count + ~500 bytes per message average
var estimatedSize = 4 + (messages.Count * 500);
var buffer = RentBuffer(estimatedSize);
try
{
using var stream = new MemoryStream(buffer, 0, buffer.Length, writable: true, publiclyVisible: true);
using var writer = new BinaryWriter(stream);
writer.Write(messages.Count);
foreach (var message in messages) writeSingle(writer, message);
writer.Flush();

var length = (int)stream.Position;
var result = new byte[length];
Buffer.BlockCopy(buffer, 0, result, 0, length);
return result;
}
catch (NotSupportedException)
{
// Buffer was too small, fall back to expandable stream
ReturnBuffer(buffer);
Copy link
Contributor

@jorik jorik Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at the code, and saw you're returning the buffer twice @jeremydmiller. Once here in the catched exception, and then again in the finally

This will return the same buffer twice to the pool, causing future Rent() calls to return the same array to 2 different callers, which can cause some nasty issues

using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writer.Write(messages.Count);
foreach (var message in messages) writeSingle(writer, message);
writer.Flush();
return stream.ToArray();
}
finally
{
ReturnBuffer(buffer);
}
}

public static byte[] Serialize(Envelope env)
{
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writeSingle(writer, env);
writer.Flush();
return stream.ToArray();
// Estimate size based on data length + headers (~200 bytes overhead)
var dataLength = env.Data?.Length ?? 0;
var estimatedSize = dataLength + 512;
var buffer = RentBuffer(estimatedSize);
try
{
using var stream = new MemoryStream(buffer, 0, buffer.Length, writable: true, publiclyVisible: true);
using var writer = new BinaryWriter(stream);
writeSingle(writer, env);
writer.Flush();

var length = (int)stream.Position;
var result = new byte[length];
Buffer.BlockCopy(buffer, 0, result, 0, length);
return result;
}
catch (NotSupportedException)
{
// Buffer was too small, fall back to expandable stream
ReturnBuffer(buffer);
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
writeSingle(writer, env);
writer.Flush();
return stream.ToArray();
}
finally
{
ReturnBuffer(buffer);
}
}

private static void writeSingle(BinaryWriter writer, Envelope env)
{
writer.Write(env.SentAt.UtcDateTime.ToBinary());

writer.Flush();

using (var headerData = new MemoryStream())
{
using (var headerWriter = new BinaryWriter(headerData))
{
var count = writeHeaders(headerWriter, env);
headerWriter.Flush();
// Write placeholder for header count, remember position
var countPosition = writer.BaseStream.Position;
writer.Write(0); // placeholder

writer.Write(count);
// Write headers directly to the stream
var count = writeHeaders(writer, env);

headerData.Position = 0;
headerData.CopyTo(writer.BaseStream);
}
}
// Go back and write the actual count
var currentPosition = writer.BaseStream.Position;
writer.BaseStream.Position = countPosition;
writer.Write(count);
writer.BaseStream.Position = currentPosition;

writer.Write(env.Data!.Length);
writer.Write(env.Data);
Expand All @@ -261,11 +339,8 @@ private static int writeHeaders(BinaryWriter writer, Envelope env)
writer.WriteProp(ref count, EnvelopeConstants.TopicNameKey, env.TopicName);


if (env.AcceptedContentTypes.Length != 0)
{
writer.WriteProp(ref count, EnvelopeConstants.AcceptedContentTypesKey,
string.Join(",", env.AcceptedContentTypes));
}
// Use cached joined string to avoid allocation on every serialization
writer.WriteProp(ref count, EnvelopeConstants.AcceptedContentTypesKey, env.AcceptedContentTypesJoined);

writer.WriteProp(ref count, EnvelopeConstants.IdKey, env.Id);
writer.WriteProp(ref count, EnvelopeConstants.ReplyRequestedKey, env.ReplyRequested);
Expand Down
Loading