Skip to content

Commit

Permalink
Merge pull request #844 from stebet/frameOptimizations
Browse files Browse the repository at this point in the history
Frame optimizations

(cherry picked from commit ed3b780)
  • Loading branch information
michaelklishin committed May 25, 2020
1 parent ce9b039 commit 595ac87
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 95 deletions.
2 changes: 1 addition & 1 deletion _site
Submodule _site updated 161 files
48 changes: 2 additions & 46 deletions projects/RabbitMQ.Client/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,64 +98,20 @@ public static void CheckEmptyFrameSize()
}

internal void Transmit(int channelNumber, Connection connection)
{
if (Method.HasContent)
{
TransmitAsFrameSet(channelNumber, connection);
}
else
{
TransmitAsSingleFrame(channelNumber, connection);
}
}

internal void TransmitAsSingleFrame(int channelNumber, Connection connection)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
}

internal void TransmitAsFrameSet(int channelNumber, Connection connection)
{
var frames = new List<OutboundFrame> { new MethodOutboundFrame(channelNumber, Method) };
if (Method.HasContent)
{
frames.Add(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
{
int remaining = Body.Length - offset;
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
}
}

connection.WriteFrameSet(frames);
}


internal static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
{
var frames = new List<OutboundFrame>();

foreach (Command cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, cmd.Body.Length));
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0) ? cmd.Body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < cmd.Body.Length; offset += bodyPayloadMax)
{
int remaining = cmd.Body.Length - offset;
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, cmd.Body.Slice(offset, count)));
}
}
}

return frames;
}

public void Dispose()
Expand Down
5 changes: 0 additions & 5 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -948,11 +948,6 @@ public void WriteFrame(OutboundFrame f)
_frameHandler.WriteFrame(f);
}

public void WriteFrameSet(IList<OutboundFrame> f)
{
_frameHandler.WriteFrameSet(f);
}

public void UpdateSecret(string newSecret, string reason)
{
_model0.UpdateSecret(newSecret, reason);
Expand Down
4 changes: 1 addition & 3 deletions projects/RabbitMQ.Client/client/impl/IFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ interface IFrameHandler

void SendHeader();

void WriteFrame(OutboundFrame frame, bool flush = true);

void WriteFrameSet(IList<OutboundFrame> frames);
void WriteFrame(OutboundFrame frame);
}
}
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,14 @@ public virtual void Transmit(Command cmd)
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
cmd.Transmit(ChannelNumber, Connection);
}

public virtual void Transmit(IList<Command> commands)
{
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
for (int i = 0; i < commands.Count; i++)
{
Command command = commands[i];
command.Transmit(ChannelNumber, Connection);
}
}
}
}
61 changes: 29 additions & 32 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -79,7 +80,8 @@ class SocketFrameHandler : IFrameHandler
private readonly ITcpClient _socket;
private readonly Stream _writer;
private readonly object _semaphore = new object();
private readonly object _streamLock = new object();
private readonly Channel<OutboundFrame> _frameChannel = Channel.CreateUnbounded<OutboundFrame>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });
private Task _frameWriter;
private bool _closed;
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
Expand Down Expand Up @@ -124,6 +126,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);

WriteTimeout = writeTimeout;
_frameWriter = Task.Run(WriteFrameImpl);
}
public AmqpTcpEndpoint Endpoint { get; set; }

Expand Down Expand Up @@ -181,6 +184,15 @@ public void Close()
{
if (!_closed)
{
try
{
_frameChannel.Writer.Complete();
_frameWriter.Wait();
}
catch(Exception)
{
}

try
{
_socket.Close();
Expand Down Expand Up @@ -222,46 +234,31 @@ public void SendHeader()
headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion;
}

Write(new ArraySegment<byte>(headerBytes), true);
_writer.Write(headerBytes, 0, 8);
_writer.Flush();
}

public void WriteFrame(OutboundFrame frame, bool flush = true)
public void WriteFrame(OutboundFrame frame)
{
int bufferSize = frame.GetMinimumBufferSize();
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
frame.WriteTo(slice);
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
Write(slice.Slice(0, frame.ByteCount), flush);
ArrayPool<byte>.Shared.Return(memoryArray);
}

public void WriteFrameSet(IList<OutboundFrame> frames)
{
for (int i = 0; i < frames.Count; i++)
{
WriteFrame(frames[i], false);
}

lock (_streamLock)
{
_writer.Flush();
}
_frameChannel.Writer.TryWrite(frame);
}

private void Write(ReadOnlyMemory<byte> buffer, bool flush)
public async Task WriteFrameImpl()
{
lock (_streamLock)
while (await _frameChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment))
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_frameChannel.Reader.TryRead(out OutboundFrame frame))
{
_writer.Write(segment.Array, segment.Offset, segment.Count);

if (flush)
{
_writer.Flush();
}
int bufferSize = frame.GetMinimumBufferSize();
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
frame.WriteTo(slice);
_writer.Write(memoryArray, 0, bufferSize);
ArrayPool<byte>.Shared.Return(memoryArray);
}

_writer.Flush();
}
}

Expand Down
16 changes: 9 additions & 7 deletions projects/Unit/TestRecoverAfterCancel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,23 @@ public void TestRecoverAfterCancel_()
UTF8Encoding enc = new UTF8Encoding();
Channel.BasicPublish("", Queue, null, enc.GetBytes("message"));
EventingBasicConsumer Consumer = new EventingBasicConsumer(Channel);
SharedQueue<BasicDeliverEventArgs> EventQueue = new SharedQueue<BasicDeliverEventArgs>();
Consumer.Received += (_, e) => EventQueue.Enqueue(e);
SharedQueue<(bool Redelivered, byte[] Body)> EventQueue = new SharedQueue<(bool Redelivered, byte[] Body)>();
// Making sure we copy the delivery body since it could be disposed at any time.
Consumer.Received += (_, e) => EventQueue.Enqueue((e.Redelivered, e.Body.ToArray()));

string CTag = Channel.BasicConsume(Queue, false, Consumer);
BasicDeliverEventArgs Event = EventQueue.Dequeue();
(bool Redelivered, byte[] Body) Event = EventQueue.Dequeue();
Channel.BasicCancel(CTag);
Channel.BasicRecover(true);

EventingBasicConsumer Consumer2 = new EventingBasicConsumer(Channel);
SharedQueue<BasicDeliverEventArgs> EventQueue2 = new SharedQueue<BasicDeliverEventArgs>();
Consumer2.Received += (_, e) => EventQueue2.Enqueue(e);
SharedQueue<(bool Redelivered, byte[] Body)> EventQueue2 = new SharedQueue<(bool Redelivered, byte[] Body)>();
// Making sure we copy the delivery body since it could be disposed at any time.
Consumer2.Received += (_, e) => EventQueue2.Enqueue((e.Redelivered, e.Body.ToArray()));
Channel.BasicConsume(Queue, false, Consumer2);
BasicDeliverEventArgs Event2 = EventQueue2.Dequeue();
(bool Redelivered, byte[] Body) Event2 = EventQueue2.Dequeue();

CollectionAssert.AreEqual(Event.Body.ToArray(), Event2.Body.ToArray());
CollectionAssert.AreEqual(Event.Body, Event2.Body);
Assert.IsFalse(Event.Redelivered);
Assert.IsTrue(Event2.Redelivered);
}
Expand Down

0 comments on commit 595ac87

Please sign in to comment.