Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanups and performance optimizations. #1017

Merged
merged 4 commits into from
Feb 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/client/framing/Protocol.cs
Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Framing.Impl;

@@ -54,10 +56,13 @@ internal sealed class Protocol : ProtocolBase

internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)
{
var commandId = (ProtocolCommandId)Util.NetworkOrderDeserializer.ReadUInt32(span);
ProtocolCommandId commandId = ReadCommandId(span);
return DecodeMethodFrom(commandId, span.Slice(4));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ProtocolCommandId ReadCommandId(ReadOnlySpan<byte> span) => (ProtocolCommandId)Util.NetworkOrderDeserializer.ReadUInt32(span);

private static Client.Impl.MethodBase DecodeMethodFrom(ProtocolCommandId commandId, ReadOnlySpan<byte> span)
{
switch (commandId)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
@@ -67,6 +67,7 @@ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reaso
Schedule(new ModelShutdown(consumer, reason, _model));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleUnlessShuttingDown(Work work)
{
if (!IsShutdown)
@@ -75,6 +76,7 @@ private void ScheduleUnlessShuttingDown(Work work)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Schedule(Work work)
{
_workService.Schedule(_model, work);
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@

using System;
using System.Buffers;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;
@@ -126,7 +127,7 @@ private void ParseHeaderFrame(in InboundFrame frame)
_bodyBytes = Array.Empty<byte>();
}

_remainingBodyBytes = (int) totalBodyBytes;
_remainingBodyBytes = (int)totalBodyBytes;
UpdateContentBodyState();
}

37 changes: 14 additions & 23 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
@@ -137,7 +138,7 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
add
{
ThrowIfDisposed();
if (CloseReason is null)
if (IsOpen)
{
_connectionShutdownWrapper.AddHandler(value);
}
@@ -157,31 +158,35 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<EventArgs> RecoverySucceeded {
public event EventHandler<EventArgs> RecoverySucceeded
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError {
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery {
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery {
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery
{
add { }
remove { }
}
@@ -273,7 +278,7 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
{
// Try to send connection.close
// Wait for CloseOk in the MainLoop
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
_session0.Transmit(new OutgoingCommand(new Impl.ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0)));
}
catch (AlreadyClosedException)
{
@@ -357,12 +362,6 @@ private void ClosingLoop()
}
}

private OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateConnectionClose(reasonCode, reasonText, out OutgoingCommand request, out _);
return request;
}

internal ISession CreateSession()
{
return _sessionManager.Create();
@@ -407,9 +406,7 @@ private bool HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.SetSessionClosing(false);
try
{
_session0.Transmit(ConnectionCloseWrapper(
hpe.ShutdownReason.ReplyCode,
hpe.ShutdownReason.ReplyText));
_session0.Transmit(new OutgoingCommand(new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0)));
return true;
}
catch (IOException ioe)
@@ -646,12 +643,12 @@ private void QuiesceChannel(SoftProtocolException pe)
// our peer. The peer will respond through the lower
// layers - specifically, through the QuiescingSession we
// installed above.
newSession.Transmit(ChannelCloseWrapper(pe.ReplyCode, pe.Message));
newSession.Transmit(new OutgoingCommand(new Impl.ChannelClose(pe.ReplyCode, pe.Message, 0, 0)));
}

private bool SetCloseReason(ShutdownEventArgs reason)
{
return System.Threading.Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
}

private void MaybeStartHeartbeatTimers()
@@ -838,12 +835,6 @@ public void Dispose()
}
}

private OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateChannelClose(reasonCode, reasonText, out OutgoingCommand request);
return request;
}

private void StartAndTune()
{
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
52 changes: 25 additions & 27 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
@@ -165,23 +165,22 @@ private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload,
_rentedArray = rentedArray;
}

private static void ProcessProtocolHeader(Stream reader)
private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> frameHeader)
{
try
{
byte b1 = (byte)reader.ReadByte();
byte b2 = (byte)reader.ReadByte();
byte b3 = (byte)reader.ReadByte();
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
if (frameHeader[0] != 'M' || frameHeader[1] != 'Q' || frameHeader[2] != 'P')
{
throw new MalformedFrameException("Invalid AMQP protocol header from server");
}

int transportHigh = reader.ReadByte();
int transportLow = reader.ReadByte();
int serverMajor = reader.ReadByte();
int serverMinor = reader.ReadByte();
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
if (serverMinor == -1)
{
throw new EndOfStreamException();
}

throw new PacketNotRecognizedException(frameHeader[3], frameHeader[4], frameHeader[5], serverMinor);
}
catch (EndOfStreamException)
{
@@ -198,39 +197,38 @@ private static void ProcessProtocolHeader(Stream reader)

internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
{
int type = default;
try
{
type = reader.ReadByte();
if (reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length) == 0)
{
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
}
}
catch (IOException ioe)
{
// If it's a WSAETIMEDOUT SocketException, unwrap it.
// This might happen when the limit of half-open connections is
// reached.
if (ioe.InnerException is null ||
!(ioe.InnerException is SocketException exception) ||
exception.SocketErrorCode != SocketError.TimedOut)
if (ioe?.InnerException is SocketException exception && exception.SocketErrorCode == SocketError.TimedOut)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
else
{
throw;
}

ExceptionDispatchInfo.Capture(ioe.InnerException).Throw();
}

switch (type)
byte firstByte = frameHeaderBuffer[0];
if (firstByte == 'A')
{
case -1:
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
case 'A':
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader);
break;
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader, frameHeaderBuffer.AsSpan(1, 6));
}

reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length);
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4)); // FIXME - throw exn on unreasonable value
FrameType type = (FrameType)firstByte;
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 2));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 3, 4)); // FIXME - throw exn on unreasonable value

const int EndMarkerLength = 1;
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
@@ -257,7 +255,7 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
}

return new InboundFrame((FrameType)type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
return new InboundFrame(type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
}

public byte[] TakeoverPayload()
Loading