Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding System.IO.Pipelines to replace Channels and Streams (replaces #949) #1199

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions projects/Benchmarks/ArrayBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// We only need this if we aren't targeting .NET 6.0 or greater since it already exists there
#if !NET6_0_OR_GREATER
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Buffers
{
public class ArrayBufferWriter<T> : IBufferWriter<T>, IDisposable
{
private T[] _rentedBuffer;
private int _written;
private long _committed;

private const int MinimumBufferSize = 256;

public ArrayBufferWriter(int initialCapacity = MinimumBufferSize)
{
if (initialCapacity <= 0)
{
throw new ArgumentException(null, nameof(initialCapacity));
}

_rentedBuffer = ArrayPool<T>.Shared.Rent(initialCapacity);
_written = 0;
_committed = 0;
}

public Memory<T> WrittenMemory
{
get
{
CheckIfDisposed();

return _rentedBuffer.AsMemory(0, _written);
}
}

public Span<T> WrittenSpan
{
get
{
CheckIfDisposed();

return _rentedBuffer.AsSpan(0, _written);
}
}

public int BytesWritten
{
get
{
CheckIfDisposed();

return _written;
}
}

public long BytesCommitted
{
get
{
CheckIfDisposed();

return _committed;
}
}

public void Clear()
{
CheckIfDisposed();

ClearHelper();
}

private void ClearHelper()
{
_rentedBuffer.AsSpan(0, _written).Clear();
_written = 0;
}

public void Advance(int count)
{
CheckIfDisposed();

if (count < 0)
throw new ArgumentException(nameof(count));

if (_written > _rentedBuffer.Length - count)
throw new InvalidOperationException("Cannot advance past the end of the buffer.");

_written += count;
}

// Returns the rented buffer back to the pool
public void Dispose()
{
if (_rentedBuffer == null)
{
return;
}

ArrayPool<T>.Shared.Return(_rentedBuffer, clearArray: true);
_rentedBuffer = null;
_written = 0;
}

private void CheckIfDisposed()
{
if (_rentedBuffer == null)
throw new ObjectDisposedException(nameof(ArrayBufferWriter<T>));
}

public Memory<T> GetMemory(int sizeHint = 0)
{
CheckIfDisposed();

if (sizeHint < 0)
throw new ArgumentException(nameof(sizeHint));

CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsMemory(_written);
}

public Span<T> GetSpan(int sizeHint = 0)
{
CheckIfDisposed();

if (sizeHint < 0)
throw new ArgumentException(nameof(sizeHint));

CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsSpan(_written);
}

private void CheckAndResizeBuffer(int sizeHint)
{
Debug.Assert(sizeHint >= 0);

if (sizeHint == 0)
{
sizeHint = MinimumBufferSize;
}

int availableSpace = _rentedBuffer.Length - _written;

if (sizeHint > availableSpace)
{
int growBy = sizeHint > _rentedBuffer.Length ? sizeHint : _rentedBuffer.Length;

int newSize = checked(_rentedBuffer.Length + growBy);

T[] oldBuffer = _rentedBuffer;

_rentedBuffer = ArrayPool<T>.Shared.Rent(newSize);

Debug.Assert(oldBuffer.Length >= _written);
Debug.Assert(_rentedBuffer.Length >= _written);

oldBuffer.AsSpan(0, _written).CopyTo(_rentedBuffer);
ArrayPool<T>.Shared.Return(oldBuffer, clearArray: true);
}

Debug.Assert(_rentedBuffer.Length - _written > 0);
Debug.Assert(_rentedBuffer.Length - _written >= sizeHint);
}
}
}
#endif
36 changes: 31 additions & 5 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Text;

using BenchmarkDotNet.Attributes;
Expand All @@ -19,7 +20,12 @@ public class MethodFramingBasicAck
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
public ReadOnlyMemory<byte> BasicAckWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicAck, _writer, Channel);
return _writer.WrittenMemory;
}
}

[Config(typeof(Config))]
Expand All @@ -41,13 +47,28 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, _writer, Channel, FrameMax);
return _writer.WrittenMemory;
}

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
return _writer.WrittenMemory;
}

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishMemoryWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, _writer, Channel, FrameMax);
return _writer.WrittenMemory;
}
}

[Config(typeof(Config))]
Expand All @@ -60,6 +81,11 @@ public class MethodFramingChannelClose
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
public ReadOnlyMemory<byte> ChannelCloseWrite()
{
ArrayBufferWriter<byte> _writer = new ArrayBufferWriter<byte>();
Framing.SerializeToFrames(ref _channelClose, _writer, Channel);
return _writer.WrittenMemory;
}
}
}
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@
<PackageReference Include="MinVer" Version="3.1.0" PrivateAssets="All" />
<PackageReference Include="System.Memory" Version="4.5.4" />
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.2.2" />
</ItemGroup>

</Project>
</Project>
11 changes: 5 additions & 6 deletions projects/RabbitMQ.Client/client/api/ConnectionFactoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

using System;
using System.Net.Sockets;

using Pipelines.Sockets.Unofficial;

using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
Expand All @@ -49,12 +52,8 @@ public class ConnectionFactoryBase
/// <returns>New instance of a <see cref="TcpClient"/>.</returns>
public static ITcpClient DefaultSocketFactory(AddressFamily addressFamily)
{
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true,
ReceiveBufferSize = 65536,
SendBufferSize = 65536
};
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SocketConnection.SetRecommendedClientOptions(socket);
return new TcpClientAdapter(socket);
}
}
Expand Down
Loading