Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use ref instead of in for generic T + interface #1098

Merged
merged 1 commit into from
Oct 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ namespace RabbitMQ.Benchmarks
[BenchmarkCategory("Framing")]
public class MethodFramingBasicAck
{
private readonly BasicAck _basicAck = new BasicAck(ulong.MaxValue, true);
private BasicAck _basicAck = new BasicAck(ulong.MaxValue, true);

[Params(0)]
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(_basicAck, Channel);
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingBasicPublish
{
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private readonly BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private readonly BasicProperties _propertiesEmpty = new BasicProperties();
private readonly BasicProperties _properties = new BasicProperties { AppId = "Application id", MessageId = "Random message id" };
private readonly ReadOnlyMemory<byte> _bodyEmpty = ReadOnlyMemory<byte>.Empty;
Expand All @@ -42,25 +42,25 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(_basicPublish, _properties, _body, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, _properties, _body, Channel, FrameMax);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(_basicPublish, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(_basicPublishMemory, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingChannelClose
{
private readonly ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999);
private ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999);

[Params(0)]
public ushort Channel { get; set; }

[Benchmark]
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(_channelClose, Channel);
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
}
}
111 changes: 68 additions & 43 deletions projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,107 +44,123 @@ public Model(bool dispatchAsync, int concurrency, ISession session) : base(dispa

public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
{
ModelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
var cmd = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
ModelSend(ref cmd);
}

public override void _Private_BasicCancel(string consumerTag, bool nowait)
{
ModelSend(new BasicCancel(consumerTag, nowait));
var cmd = new BasicCancel(consumerTag, nowait);
ModelSend(ref cmd);
}

public override void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary<string, object> arguments)
{
ModelSend(new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments));
var cmd = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments);
ModelSend(ref cmd);
}

public override void _Private_BasicGet(string queue, bool autoAck)
{
ModelSend(new BasicGet(queue, autoAck));
var cmd = new BasicGet(queue, autoAck);
ModelSend(ref cmd);
}

public override void _Private_BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
ModelSend(new BasicPublish(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body);
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
ModelSend(ref cmd, (BasicProperties) basicProperties, body);
}

public override void _Private_BasicPublishMemory(ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
ModelSend(new BasicPublishMemory(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body);
var cmd = new BasicPublishMemory(exchange, routingKey, mandatory, default);
ModelSend(ref cmd, (BasicProperties) basicProperties, body);
}

public override void _Private_BasicRecover(bool requeue)
{
ModelSend(new BasicRecover(requeue));
var cmd = new BasicRecover(requeue);
ModelSend(ref cmd);
}

public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
{
ModelSend(new ChannelClose(replyCode, replyText, classId, methodId));
var cmd = new ChannelClose(replyCode, replyText, classId, methodId);
ModelSend(ref cmd);
}

public override void _Private_ChannelCloseOk()
{
ModelSend(new ChannelCloseOk());
var cmd = new ChannelCloseOk();
ModelSend(ref cmd);
}

public override void _Private_ChannelFlowOk(bool active)
{
ModelSend(new ChannelFlowOk(active));
var cmd = new ChannelFlowOk(active);
ModelSend(ref cmd);
}

public override void _Private_ChannelOpen()
{
ModelRpc(new ChannelOpen(), ProtocolCommandId.ChannelOpenOk);
var cmd = new ChannelOpen();
ModelRpc(ref cmd, ProtocolCommandId.ChannelOpenOk);
}

public override void _Private_ConfirmSelect(bool nowait)
{
var method = new ConfirmSelect(nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ConfirmSelectOk);
ModelRpc(ref method, ProtocolCommandId.ConfirmSelectOk);
}
}

public override void _Private_ConnectionCloseOk()
{
ModelSend(new ConnectionCloseOk());
var cmd = new ConnectionCloseOk();
ModelSend(ref cmd);
}

public override void _Private_ConnectionOpen(string virtualHost)
{
ModelSend(new ConnectionOpen(virtualHost));
var cmd = new ConnectionOpen(virtualHost);
ModelSend(ref cmd);
}

public override void _Private_ConnectionSecureOk(byte[] response)
{
ModelSend(new ConnectionSecureOk(response));
var cmd = new ConnectionSecureOk(response);
ModelSend(ref cmd);
}

public override void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
{
ModelSend(new ConnectionStartOk(clientProperties, mechanism, response, locale));
var cmd = new ConnectionStartOk(clientProperties, mechanism, response, locale);
ModelSend(ref cmd);
}

public override void _Private_UpdateSecret(byte[] newSecret, string reason)
{
ModelRpc(new ConnectionUpdateSecret(newSecret, reason), ProtocolCommandId.ConnectionUpdateSecretOk);
var cmd = new ConnectionUpdateSecret(newSecret, reason);
ModelRpc(ref cmd, ProtocolCommandId.ConnectionUpdateSecretOk);
}

public override void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
ExchangeBind method = new ExchangeBind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeBindOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeBindOk);
}
}

Expand All @@ -153,11 +169,11 @@ public override void _Private_ExchangeDeclare(string exchange, string type, bool
ExchangeDeclare method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeDeclareOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeDeclareOk);
}
}

Expand All @@ -166,11 +182,11 @@ public override void _Private_ExchangeDelete(string exchange, bool ifUnused, boo
ExchangeDelete method = new ExchangeDelete(exchange, ifUnused, nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeDeleteOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeDeleteOk);
}
}

Expand All @@ -179,11 +195,11 @@ public override void _Private_ExchangeUnbind(string destination, string source,
ExchangeUnbind method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.ExchangeUnbindOk);
ModelRpc(ref method, ProtocolCommandId.ExchangeUnbindOk);
}
}

Expand All @@ -192,11 +208,11 @@ public override void _Private_QueueBind(string queue, string exchange, string ro
QueueBind method = new QueueBind(queue, exchange, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelRpc(method, ProtocolCommandId.QueueBindOk);
ModelRpc(ref method, ProtocolCommandId.QueueBindOk);
}
}

Expand All @@ -205,11 +221,11 @@ public override void _Private_QueueDeclare(string queue, bool passive, bool dura
QueueDeclare method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
}
else
{
ModelSend(method);
ModelSend(ref method);
}
}

Expand All @@ -218,48 +234,53 @@ public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEm
QueueDelete method = new QueueDelete(queue, ifUnused, ifEmpty, nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
return 0xFFFFFFFF;
}

return ModelRpc(method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
return ModelRpc(ref method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
}

public override uint _Private_QueuePurge(string queue, bool nowait)
{
QueuePurge method = new QueuePurge(queue, nowait);
if (nowait)
{
ModelSend(method);
ModelSend(ref method);
return 0xFFFFFFFF;
}

return ModelRpc(method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
return ModelRpc(ref method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
}

public override void BasicAck(ulong deliveryTag, bool multiple)
{
ModelSend(new BasicAck(deliveryTag, multiple));
var cmd = new BasicAck(deliveryTag, multiple);
ModelSend(ref cmd);
}

public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
{
ModelSend(new BasicNack(deliveryTag, multiple, requeue));
var cmd = new BasicNack(deliveryTag, multiple, requeue);
ModelSend(ref cmd);
}

public override void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
ModelRpc(new BasicQos(prefetchSize, prefetchCount, global), ProtocolCommandId.BasicQosOk);
var cmd = new BasicQos(prefetchSize, prefetchCount, global);
ModelRpc(ref cmd, ProtocolCommandId.BasicQosOk);
}

public override void BasicRecoverAsync(bool requeue)
{
ModelSend(new BasicRecoverAsync(requeue));
var cmd = new BasicRecoverAsync(requeue);
ModelSend(ref cmd);
}

public override void BasicReject(ulong deliveryTag, bool requeue)
{
ModelSend(new BasicReject(deliveryTag, requeue));
var cmd = new BasicReject(deliveryTag, requeue);
ModelSend(ref cmd);
}

public override IBasicProperties CreateBasicProperties()
Expand All @@ -269,22 +290,26 @@ public override IBasicProperties CreateBasicProperties()

public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
ModelRpc(new QueueUnbind(queue, exchange, routingKey, arguments), ProtocolCommandId.QueueUnbindOk);
var cmd = new QueueUnbind(queue, exchange, routingKey, arguments);
ModelRpc(ref cmd, ProtocolCommandId.QueueUnbindOk);
}

public override void TxCommit()
{
ModelRpc(new TxCommit(), ProtocolCommandId.TxCommitOk);
var cmd = new TxCommit();
ModelRpc(ref cmd, ProtocolCommandId.TxCommitOk);
}

public override void TxRollback()
{
ModelRpc(new TxRollback(), ProtocolCommandId.TxRollbackOk);
var cmd = new TxRollback();
ModelRpc(ref cmd, ProtocolCommandId.TxRollbackOk);
}

public override void TxSelect()
{
ModelRpc(new TxSelect(), ProtocolCommandId.TxSelectOk);
var cmd = new TxSelect();
ModelRpc(ref cmd, ProtocolCommandId.TxSelectOk);
}

protected override bool DispatchAsynchronous(in IncomingCommand cmd)
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.SetSessionClosing(false);
try
{
_session0.Transmit(new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0));
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
ClosingLoop();
}
catch (IOException ioe)
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
try
{
// Try to send connection.close wait for CloseOk in the MainLoop
_session0.Transmit(new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0));
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
}
catch (AlreadyClosedException)
{
Expand Down
Loading