Skip to content

Commit

Permalink
Merge pull request #1098 from bollhals/feature/refInsteadOfIn
Browse files Browse the repository at this point in the history
use ref instead of in for generic T + interface
  • Loading branch information
michaelklishin authored Oct 17, 2021
2 parents 7640036 + 1b1aed7 commit 5d60790
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 76 deletions.
18 changes: 9 additions & 9 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ namespace RabbitMQ.Benchmarks
[BenchmarkCategory("Framing")]
public class MethodFramingBasicAck
{
private readonly BasicAck _basicAck = new BasicAck(ulong.MaxValue, true);
private BasicAck _basicAck = new BasicAck(ulong.MaxValue, true);

[Params(0)]
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(_basicAck, Channel);
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingBasicPublish
{
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private readonly BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private readonly BasicProperties _propertiesEmpty = new BasicProperties();
private readonly BasicProperties _properties = new BasicProperties { AppId = "Application id", MessageId = "Random message id" };
private readonly ReadOnlyMemory<byte> _bodyEmpty = ReadOnlyMemory<byte>.Empty;
Expand All @@ -42,25 +42,25 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(_basicPublish, _properties, _body, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, _properties, _body, Channel, FrameMax);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(_basicPublish, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(_basicPublishMemory, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingChannelClose
{
private readonly ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999);
private ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999);

[Params(0)]
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(_channelClose, Channel);
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
}
}
111 changes: 68 additions & 43 deletions projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,107 +44,123 @@ public Model(bool dispatchAsync, int concurrency, ISession session) : base(dispa

public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
{
ModelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
var cmd = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
ModelSend(ref cmd);
}

public override void _Private_BasicCancel(string consumerTag, bool nowait)
{
ModelSend(new BasicCancel(consumerTag, nowait));
var cmd = new BasicCancel(consumerTag, nowait);
ModelSend(ref cmd);
}

public override void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary<string, object> arguments)
{
ModelSend(new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments));
var cmd = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments);
ModelSend(ref cmd);
}

public override void _Private_BasicGet(string queue, bool autoAck)
{
ModelSend(new BasicGet(queue, autoAck));
var cmd = new BasicGet(queue, autoAck);
ModelSend(ref cmd);
}

public override void _Private_BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
ModelSend(new BasicPublish(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body);
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
ModelSend(ref cmd, (BasicProperties) basicProperties, body);
}

public override void _Private_BasicPublishMemory(ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
ModelSend(new BasicPublishMemory(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body);
var cmd = new BasicPublishMemory(exchange, routingKey, mandatory, default);
ModelSend(ref cmd, (BasicProperties) basicProperties, body);
}

public override void _Private_BasicRecover(bool requeue)
{
ModelSend(new BasicRecover(requeue));
var cmd = new BasicRecover(requeue);
ModelSend(ref cmd);
}

public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
{
ModelSend(new ChannelClose(replyCode, replyText, classId, methodId));
var cmd = new ChannelClose(replyCode, replyText, classId, methodId);
ModelSend(ref cmd);
}

public override void _Private_ChannelCloseOk()
{
ModelSend(new ChannelCloseOk());
var cmd = new ChannelCloseOk();
ModelSend(ref cmd);
}

public override void _Private_ChannelFlowOk(bool active)
{
ModelSend(new ChannelFlowOk(active));
var cmd = new ChannelFlowOk(active);
ModelSend(ref cmd);
}

public override void _Private_ChannelOpen()
{
ModelRpc(new ChannelOpen(), ProtocolCommandId.ChannelOpenOk);
var cmd = new ChannelOpen();
ModelRpc(ref cmd, ProtocolCommandId.ChannelOpenOk);
}

public override void _Private_ConfirmSelect(bool nowait)
{
var method = new ConfirmSelect(nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ConfirmSelectOk);
ModelRpc(ref method, ProtocolCommandId.ConfirmSelectOk);
}
}

public override void _Private_ConnectionCloseOk()
{
ModelSend(new ConnectionCloseOk());
var cmd = new ConnectionCloseOk();
ModelSend(ref cmd);
}

public override void _Private_ConnectionOpen(string virtualHost)
{
ModelSend(new ConnectionOpen(virtualHost));
var cmd = new ConnectionOpen(virtualHost);
ModelSend(ref cmd);
}

public override void _Private_ConnectionSecureOk(byte[] response)
{
ModelSend(new ConnectionSecureOk(response));
var cmd = new ConnectionSecureOk(response);
ModelSend(ref cmd);
}

public override void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
{
ModelSend(new ConnectionStartOk(clientProperties, mechanism, response, locale));
var cmd = new ConnectionStartOk(clientProperties, mechanism, response, locale);
ModelSend(ref cmd);
}

public override void _Private_UpdateSecret(byte[] newSecret, string reason)
{
ModelRpc(new ConnectionUpdateSecret(newSecret, reason), ProtocolCommandId.ConnectionUpdateSecretOk);
var cmd = new ConnectionUpdateSecret(newSecret, reason);
ModelRpc(ref cmd, ProtocolCommandId.ConnectionUpdateSecretOk);
}

public override void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
ExchangeBind method = new ExchangeBind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeBindOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeBindOk);
}
}

Expand All @@ -153,11 +169,11 @@ public override void _Private_ExchangeDeclare(string exchange, string type, bool
ExchangeDeclare method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeDeclareOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeDeclareOk);
}
}

Expand All @@ -166,11 +182,11 @@ public override void _Private_ExchangeDelete(string exchange, bool ifUnused, boo
ExchangeDelete method = new ExchangeDelete(exchange, ifUnused, nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeDeleteOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeDeleteOk);
}
}

Expand All @@ -179,11 +195,11 @@ public override void _Private_ExchangeUnbind(string destination, string source,
ExchangeUnbind method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeUnbindOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeUnbindOk);
}
}

Expand All @@ -192,11 +208,11 @@ public override void _Private_QueueBind(string queue, string exchange, string ro
QueueBind method = new QueueBind(queue, exchange, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.QueueBindOk);
ModelRpc(ref method, ProtocolCommandId.QueueBindOk);
}
}

Expand All @@ -205,11 +221,11 @@ public override void _Private_QueueDeclare(string queue, bool passive, bool dura
QueueDeclare method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelSend(method);
ModelSend(ref method);
}
}

Expand All @@ -218,48 +234,53 @@ public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEm
QueueDelete method = new QueueDelete(queue, ifUnused, ifEmpty, nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
return 0xFFFFFFFF;
}

return ModelRpc(method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
return ModelRpc(ref method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
}

public override uint _Private_QueuePurge(string queue, bool nowait)
{
QueuePurge method = new QueuePurge(queue, nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
return 0xFFFFFFFF;
}

return ModelRpc(method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
return ModelRpc(ref method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
}

public override void BasicAck(ulong deliveryTag, bool multiple)
{
ModelSend(new BasicAck(deliveryTag, multiple));
var cmd = new BasicAck(deliveryTag, multiple);
ModelSend(ref cmd);
}

public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
{
ModelSend(new BasicNack(deliveryTag, multiple, requeue));
var cmd = new BasicNack(deliveryTag, multiple, requeue);
ModelSend(ref cmd);
}

public override void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
ModelRpc(new BasicQos(prefetchSize, prefetchCount, global), ProtocolCommandId.BasicQosOk);
var cmd = new BasicQos(prefetchSize, prefetchCount, global);
ModelRpc(ref cmd, ProtocolCommandId.BasicQosOk);
}

public override void BasicRecoverAsync(bool requeue)
{
ModelSend(new BasicRecoverAsync(requeue));
var cmd = new BasicRecoverAsync(requeue);
ModelSend(ref cmd);
}

public override void BasicReject(ulong deliveryTag, bool requeue)
{
ModelSend(new BasicReject(deliveryTag, requeue));
var cmd = new BasicReject(deliveryTag, requeue);
ModelSend(ref cmd);
}

public override IBasicProperties CreateBasicProperties()
Expand All @@ -269,22 +290,26 @@ public override IBasicProperties CreateBasicProperties()

public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
ModelRpc(new QueueUnbind(queue, exchange, routingKey, arguments), ProtocolCommandId.QueueUnbindOk);
var cmd = new QueueUnbind(queue, exchange, routingKey, arguments);
ModelRpc(ref cmd, ProtocolCommandId.QueueUnbindOk);
}

public override void TxCommit()
{
ModelRpc(new TxCommit(), ProtocolCommandId.TxCommitOk);
var cmd = new TxCommit();
ModelRpc(ref cmd, ProtocolCommandId.TxCommitOk);
}

public override void TxRollback()
{
ModelRpc(new TxRollback(), ProtocolCommandId.TxRollbackOk);
var cmd = new TxRollback();
ModelRpc(ref cmd, ProtocolCommandId.TxRollbackOk);
}

public override void TxSelect()
{
ModelRpc(new TxSelect(), ProtocolCommandId.TxSelectOk);
var cmd = new TxSelect();
ModelRpc(ref cmd, ProtocolCommandId.TxSelectOk);
}

protected override bool DispatchAsynchronous(in IncomingCommand cmd)
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.SetSessionClosing(false);
try
{
_session0.Transmit(new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0));
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
ClosingLoop();
}
catch (IOException ioe)
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
try
{
// Try to send connection.close wait for CloseOk in the MainLoop
_session0.Transmit(new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0));
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
}
catch (AlreadyClosedException)
{
Expand Down
Loading

0 comments on commit 5d60790

Please sign in to comment.