From 1b1aed7861e097335d1cd5da7d59c5735c7b7c6d Mon Sep 17 00:00:00 2001 From: bollhals Date: Fri, 15 Oct 2021 00:07:26 +0200 Subject: [PATCH] use ref instead of in for generic T + interface --- .../WireFormatting/MethodFraming.cs | 18 +-- .../RabbitMQ.Client/client/framing/Model.cs | 111 +++++++++++------- .../client/impl/Connection.Receive.cs | 3 +- .../RabbitMQ.Client/client/impl/Connection.cs | 3 +- projects/RabbitMQ.Client/client/impl/Frame.cs | 10 +- .../RabbitMQ.Client/client/impl/ISession.cs | 4 +- .../client/impl/MainSession.cs | 4 +- .../RabbitMQ.Client/client/impl/ModelBase.cs | 16 +-- .../client/impl/SessionBase.cs | 8 +- projects/Unit/TestFrameFormatting.cs | 2 +- 10 files changed, 103 insertions(+), 76 deletions(-) diff --git a/projects/Benchmarks/WireFormatting/MethodFraming.cs b/projects/Benchmarks/WireFormatting/MethodFraming.cs index 3321bab50d..22a43a4c7d 100644 --- a/projects/Benchmarks/WireFormatting/MethodFraming.cs +++ b/projects/Benchmarks/WireFormatting/MethodFraming.cs @@ -14,13 +14,13 @@ namespace RabbitMQ.Benchmarks [BenchmarkCategory("Framing")] public class MethodFramingBasicAck { - private readonly BasicAck _basicAck = new BasicAck(ulong.MaxValue, true); + private BasicAck _basicAck = new BasicAck(ulong.MaxValue, true); [Params(0)] public ushort Channel { get; set; } [Benchmark] - public ReadOnlyMemory BasicAckWrite() => Framing.SerializeToFrames(_basicAck, Channel); + public ReadOnlyMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel); } [Config(typeof(Config))] @@ -28,8 +28,8 @@ public class MethodFramingBasicAck public class MethodFramingBasicPublish { private const string StringValue = "Exchange_OR_RoutingKey"; - private readonly BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false); - private readonly BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false); + private BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false); + private BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false); private readonly BasicProperties _propertiesEmpty = new BasicProperties(); private readonly BasicProperties _properties = new BasicProperties { AppId = "Application id", MessageId = "Random message id" }; private readonly ReadOnlyMemory _bodyEmpty = ReadOnlyMemory.Empty; @@ -42,25 +42,25 @@ public class MethodFramingBasicPublish public int FrameMax { get; set; } [Benchmark] - public ReadOnlyMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(_basicPublish, _properties, _body, Channel, FrameMax); + public ReadOnlyMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, _properties, _body, Channel, FrameMax); [Benchmark] - public ReadOnlyMemory BasicPublishWrite() => Framing.SerializeToFrames(_basicPublish, _propertiesEmpty, _bodyEmpty, Channel, FrameMax); + public ReadOnlyMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, _propertiesEmpty, _bodyEmpty, Channel, FrameMax); [Benchmark] - public ReadOnlyMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(_basicPublishMemory, _propertiesEmpty, _bodyEmpty, Channel, FrameMax); + public ReadOnlyMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, _propertiesEmpty, _bodyEmpty, Channel, FrameMax); } [Config(typeof(Config))] [BenchmarkCategory("Framing")] public class MethodFramingChannelClose { - private readonly ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999); + private ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999); [Params(0)] public ushort Channel { get; set; } [Benchmark] - public ReadOnlyMemory ChannelCloseWrite() => Framing.SerializeToFrames(_channelClose, Channel); + public ReadOnlyMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel); } } diff --git a/projects/RabbitMQ.Client/client/framing/Model.cs b/projects/RabbitMQ.Client/client/framing/Model.cs index b2f7e18ca6..c79c973777 100644 --- a/projects/RabbitMQ.Client/client/framing/Model.cs +++ b/projects/RabbitMQ.Client/client/framing/Model.cs @@ -44,57 +44,68 @@ public Model(bool dispatchAsync, int concurrency, ISession session) : base(dispa public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat) { - ModelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat)); + var cmd = new ConnectionTuneOk(channelMax, frameMax, heartbeat); + ModelSend(ref cmd); } public override void _Private_BasicCancel(string consumerTag, bool nowait) { - ModelSend(new BasicCancel(consumerTag, nowait)); + var cmd = new BasicCancel(consumerTag, nowait); + ModelSend(ref cmd); } public override void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary arguments) { - ModelSend(new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments)); + var cmd = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments); + ModelSend(ref cmd); } public override void _Private_BasicGet(string queue, bool autoAck) { - ModelSend(new BasicGet(queue, autoAck)); + var cmd = new BasicGet(queue, autoAck); + ModelSend(ref cmd); } public override void _Private_BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) { - ModelSend(new BasicPublish(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body); + var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + ModelSend(ref cmd, (BasicProperties) basicProperties, body); } public override void _Private_BasicPublishMemory(ReadOnlyMemory exchange, ReadOnlyMemory routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) { - ModelSend(new BasicPublishMemory(exchange, routingKey, mandatory, default), (BasicProperties) basicProperties, body); + var cmd = new BasicPublishMemory(exchange, routingKey, mandatory, default); + ModelSend(ref cmd, (BasicProperties) basicProperties, body); } public override void _Private_BasicRecover(bool requeue) { - ModelSend(new BasicRecover(requeue)); + var cmd = new BasicRecover(requeue); + ModelSend(ref cmd); } public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId) { - ModelSend(new ChannelClose(replyCode, replyText, classId, methodId)); + var cmd = new ChannelClose(replyCode, replyText, classId, methodId); + ModelSend(ref cmd); } public override void _Private_ChannelCloseOk() { - ModelSend(new ChannelCloseOk()); + var cmd = new ChannelCloseOk(); + ModelSend(ref cmd); } public override void _Private_ChannelFlowOk(bool active) { - ModelSend(new ChannelFlowOk(active)); + var cmd = new ChannelFlowOk(active); + ModelSend(ref cmd); } public override void _Private_ChannelOpen() { - ModelRpc(new ChannelOpen(), ProtocolCommandId.ChannelOpenOk); + var cmd = new ChannelOpen(); + ModelRpc(ref cmd, ProtocolCommandId.ChannelOpenOk); } public override void _Private_ConfirmSelect(bool nowait) @@ -102,37 +113,42 @@ public override void _Private_ConfirmSelect(bool nowait) var method = new ConfirmSelect(nowait); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelRpc(method, ProtocolCommandId.ConfirmSelectOk); + ModelRpc(ref method, ProtocolCommandId.ConfirmSelectOk); } } public override void _Private_ConnectionCloseOk() { - ModelSend(new ConnectionCloseOk()); + var cmd = new ConnectionCloseOk(); + ModelSend(ref cmd); } public override void _Private_ConnectionOpen(string virtualHost) { - ModelSend(new ConnectionOpen(virtualHost)); + var cmd = new ConnectionOpen(virtualHost); + ModelSend(ref cmd); } public override void _Private_ConnectionSecureOk(byte[] response) { - ModelSend(new ConnectionSecureOk(response)); + var cmd = new ConnectionSecureOk(response); + ModelSend(ref cmd); } public override void _Private_ConnectionStartOk(IDictionary clientProperties, string mechanism, byte[] response, string locale) { - ModelSend(new ConnectionStartOk(clientProperties, mechanism, response, locale)); + var cmd = new ConnectionStartOk(clientProperties, mechanism, response, locale); + ModelSend(ref cmd); } public override void _Private_UpdateSecret(byte[] newSecret, string reason) { - ModelRpc(new ConnectionUpdateSecret(newSecret, reason), ProtocolCommandId.ConnectionUpdateSecretOk); + var cmd = new ConnectionUpdateSecret(newSecret, reason); + ModelRpc(ref cmd, ProtocolCommandId.ConnectionUpdateSecretOk); } public override void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary arguments) @@ -140,11 +156,11 @@ public override void _Private_ExchangeBind(string destination, string source, st ExchangeBind method = new ExchangeBind(destination, source, routingKey, nowait, arguments); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelRpc(method, ProtocolCommandId.ExchangeBindOk); + ModelRpc(ref method, ProtocolCommandId.ExchangeBindOk); } } @@ -153,11 +169,11 @@ public override void _Private_ExchangeDeclare(string exchange, string type, bool ExchangeDeclare method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelRpc(method, ProtocolCommandId.ExchangeDeclareOk); + ModelRpc(ref method, ProtocolCommandId.ExchangeDeclareOk); } } @@ -166,11 +182,11 @@ public override void _Private_ExchangeDelete(string exchange, bool ifUnused, boo ExchangeDelete method = new ExchangeDelete(exchange, ifUnused, nowait); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelRpc(method, ProtocolCommandId.ExchangeDeleteOk); + ModelRpc(ref method, ProtocolCommandId.ExchangeDeleteOk); } } @@ -179,11 +195,11 @@ public override void _Private_ExchangeUnbind(string destination, string source, ExchangeUnbind method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelRpc(method, ProtocolCommandId.ExchangeUnbindOk); + ModelRpc(ref method, ProtocolCommandId.ExchangeUnbindOk); } } @@ -192,11 +208,11 @@ public override void _Private_QueueBind(string queue, string exchange, string ro QueueBind method = new QueueBind(queue, exchange, routingKey, nowait, arguments); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelRpc(method, ProtocolCommandId.QueueBindOk); + ModelRpc(ref method, ProtocolCommandId.QueueBindOk); } } @@ -205,11 +221,11 @@ public override void _Private_QueueDeclare(string queue, bool passive, bool dura QueueDeclare method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments); if (nowait) { - ModelSend(method); + ModelSend(ref method); } else { - ModelSend(method); + ModelSend(ref method); } } @@ -218,11 +234,11 @@ public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEm QueueDelete method = new QueueDelete(queue, ifUnused, ifEmpty, nowait); if (nowait) { - ModelSend(method); + ModelSend(ref method); return 0xFFFFFFFF; } - return ModelRpc(method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount); + return ModelRpc(ref method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount); } public override uint _Private_QueuePurge(string queue, bool nowait) @@ -230,36 +246,41 @@ public override uint _Private_QueuePurge(string queue, bool nowait) QueuePurge method = new QueuePurge(queue, nowait); if (nowait) { - ModelSend(method); + ModelSend(ref method); return 0xFFFFFFFF; } - return ModelRpc(method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount); + return ModelRpc(ref method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount); } public override void BasicAck(ulong deliveryTag, bool multiple) { - ModelSend(new BasicAck(deliveryTag, multiple)); + var cmd = new BasicAck(deliveryTag, multiple); + ModelSend(ref cmd); } public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue) { - ModelSend(new BasicNack(deliveryTag, multiple, requeue)); + var cmd = new BasicNack(deliveryTag, multiple, requeue); + ModelSend(ref cmd); } public override void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) { - ModelRpc(new BasicQos(prefetchSize, prefetchCount, global), ProtocolCommandId.BasicQosOk); + var cmd = new BasicQos(prefetchSize, prefetchCount, global); + ModelRpc(ref cmd, ProtocolCommandId.BasicQosOk); } public override void BasicRecoverAsync(bool requeue) { - ModelSend(new BasicRecoverAsync(requeue)); + var cmd = new BasicRecoverAsync(requeue); + ModelSend(ref cmd); } public override void BasicReject(ulong deliveryTag, bool requeue) { - ModelSend(new BasicReject(deliveryTag, requeue)); + var cmd = new BasicReject(deliveryTag, requeue); + ModelSend(ref cmd); } public override IBasicProperties CreateBasicProperties() @@ -269,22 +290,26 @@ public override IBasicProperties CreateBasicProperties() public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments) { - ModelRpc(new QueueUnbind(queue, exchange, routingKey, arguments), ProtocolCommandId.QueueUnbindOk); + var cmd = new QueueUnbind(queue, exchange, routingKey, arguments); + ModelRpc(ref cmd, ProtocolCommandId.QueueUnbindOk); } public override void TxCommit() { - ModelRpc(new TxCommit(), ProtocolCommandId.TxCommitOk); + var cmd = new TxCommit(); + ModelRpc(ref cmd, ProtocolCommandId.TxCommitOk); } public override void TxRollback() { - ModelRpc(new TxRollback(), ProtocolCommandId.TxRollbackOk); + var cmd = new TxRollback(); + ModelRpc(ref cmd, ProtocolCommandId.TxRollbackOk); } public override void TxSelect() { - ModelRpc(new TxSelect(), ProtocolCommandId.TxSelectOk); + var cmd = new TxSelect(); + ModelRpc(ref cmd, ProtocolCommandId.TxSelectOk); } protected override bool DispatchAsynchronous(in IncomingCommand cmd) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index 9ab2ece2f4..a44a0b217f 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -147,7 +147,8 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe) _session0.SetSessionClosing(false); try { - _session0.Transmit(new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0)); + var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0); + _session0.Transmit(ref cmd); ClosingLoop(); } catch (IOException ioe) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 4a0c09b4a3..970441826a 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -270,7 +270,8 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) try { // Try to send connection.close wait for CloseOk in the MainLoop - _session0.Transmit(new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0)); + var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0); + _session0.Transmit(ref cmd); } catch (AlreadyClosedException) { diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index 8ea0fee924..100600e42c 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -65,7 +65,7 @@ internal static class Method public const int FrameSize = BaseFrameSize + 2 + 2; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int WriteTo(Span span, ushort channel, in T method) where T : struct, IOutgoingAmqpMethod + public static int WriteTo(Span span, ushort channel, ref T method) where T : struct, IOutgoingAmqpMethod { const int StartClassId = StartPayload; const int StartMethodArguments = StartClassId + 4; @@ -149,21 +149,21 @@ public static Memory GetHeartbeatFrame() } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static ReadOnlyMemory SerializeToFrames(in T method, ushort channelNumber) + public static ReadOnlyMemory SerializeToFrames(ref T method, ushort channelNumber) where T : struct, IOutgoingAmqpMethod { int size = Method.FrameSize + method.GetRequiredBufferSize(); // Will be returned by SocketFrameWriter.WriteLoop var array = ArrayPool.Shared.Rent(size); - int offset = Method.WriteTo(array, channelNumber, method); + int offset = Method.WriteTo(array, channelNumber, ref method); System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); return new ReadOnlyMemory(array, 0, size); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static ReadOnlyMemory SerializeToFrames(in T method, ContentHeaderBase header, ReadOnlyMemory body, ushort channelNumber, int maxBodyPayloadBytes) + public static ReadOnlyMemory SerializeToFrames(ref T method, ContentHeaderBase header, ReadOnlyMemory body, ushort channelNumber, int maxBodyPayloadBytes) where T : struct, IOutgoingAmqpMethod { int remainingBodyBytes = body.Length; @@ -174,7 +174,7 @@ public static ReadOnlyMemory SerializeToFrames(in T method, ContentHead // Will be returned by SocketFrameWriter.WriteLoop var array = ArrayPool.Shared.Rent(size); - int offset = Method.WriteTo(array, channelNumber, method); + int offset = Method.WriteTo(array, channelNumber, ref method); offset += Header.WriteTo(array.AsSpan(offset), channelNumber, header, remainingBodyBytes); var bodySpan = body.Span; while (remainingBodyBytes > 0) diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index cc51f3c0f2..a1b2e99bc7 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -72,7 +72,7 @@ internal interface ISession void Close(ShutdownEventArgs reason, bool notify); bool HandleFrame(in InboundFrame frame); void Notify(); - void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod; - void Transmit(in T cmd, ContentHeaderBase header, ReadOnlyMemory body) where T : struct, IOutgoingAmqpMethod; + void Transmit(ref T cmd) where T : struct, IOutgoingAmqpMethod; + void Transmit(ref T cmd, ContentHeaderBase header, ReadOnlyMemory body) where T : struct, IOutgoingAmqpMethod; } } diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index 8b7e19f597..d600a67eec 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -99,7 +99,7 @@ public void SetSessionClosing(bool closeServerInitiated) } } - public override void Transmit(in T cmd) + public override void Transmit(ref T cmd) { if (_closing && // Are we closing? cmd.ProtocolCommandId != ProtocolCommandId.ConnectionCloseOk && // is this not a close-ok? @@ -110,7 +110,7 @@ public override void Transmit(in T cmd) return; } - base.Transmit(in cmd); + base.Transmit(ref cmd); } } } diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index 9c575ee393..b14d809b9b 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -334,7 +334,7 @@ private void HandleCommand(in IncomingCommand cmd) } } - protected void ModelRpc(in TMethod method, ProtocolCommandId returnCommandId) + protected void ModelRpc(ref TMethod method, ProtocolCommandId returnCommandId) where TMethod : struct, IOutgoingAmqpMethod { var k = new SimpleBlockingRpcContinuation(); @@ -342,7 +342,7 @@ protected void ModelRpc(in TMethod method, ProtocolCommandId returnComm lock (_rpcLock) { Enqueue(k); - Session.Transmit(method); + Session.Transmit(ref method); k.GetReply(ContinuationTimeout, out reply); } @@ -354,7 +354,7 @@ protected void ModelRpc(in TMethod method, ProtocolCommandId returnComm } } - protected TReturn ModelRpc(in TMethod method, ProtocolCommandId returnCommandId, Func, TReturn> createFunc) + protected TReturn ModelRpc(ref TMethod method, ProtocolCommandId returnCommandId, Func, TReturn> createFunc) where TMethod : struct, IOutgoingAmqpMethod { var k = new SimpleBlockingRpcContinuation(); @@ -363,7 +363,7 @@ protected TReturn ModelRpc(in TMethod method, ProtocolCommandI lock (_rpcLock) { Enqueue(k); - Session.Transmit(method); + Session.Transmit(ref method); k.GetReply(ContinuationTimeout, out reply); } @@ -379,19 +379,19 @@ protected TReturn ModelRpc(in TMethod method, ProtocolCommandI } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected void ModelSend(in T method) where T : struct, IOutgoingAmqpMethod + protected void ModelSend(ref T method) where T : struct, IOutgoingAmqpMethod { - Session.Transmit(method); + Session.Transmit(ref method); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected void ModelSend(in T method, ContentHeaderBase header, ReadOnlyMemory body) where T : struct, IOutgoingAmqpMethod + protected void ModelSend(ref T method, ContentHeaderBase header, ReadOnlyMemory body) where T : struct, IOutgoingAmqpMethod { if (!_flowControlBlock.IsSet) { _flowControlBlock.Wait(); } - Session.Transmit(method, header, body); + Session.Transmit(ref method, header, body); } internal void OnCallbackException(CallbackExceptionEventArgs args) diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 4e770673e9..6c0d6a5342 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -129,24 +129,24 @@ public void Notify() OnSessionShutdown(reason); } - public virtual void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod + public virtual void Transmit(ref T cmd) where T : struct, IOutgoingAmqpMethod { if (!IsOpen && cmd.ProtocolCommandId != client.framing.ProtocolCommandId.ChannelCloseOk) { ThrowAlreadyClosedException(); } - Connection.Write(Framing.SerializeToFrames(cmd, ChannelNumber)); + Connection.Write(Framing.SerializeToFrames(ref cmd, ChannelNumber)); } - public void Transmit(in T cmd, ContentHeaderBase header, ReadOnlyMemory body) where T : struct, IOutgoingAmqpMethod + public void Transmit(ref T cmd, ContentHeaderBase header, ReadOnlyMemory body) where T : struct, IOutgoingAmqpMethod { if (!IsOpen && cmd.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk) { ThrowAlreadyClosedException(); } - Connection.Write(Framing.SerializeToFrames(cmd, header, body, ChannelNumber, Connection.MaxPayloadSize)); + Connection.Write(Framing.SerializeToFrames(ref cmd, header, body, ChannelNumber, Connection.MaxPayloadSize)); } private void ThrowAlreadyClosedException() diff --git a/projects/Unit/TestFrameFormatting.cs b/projects/Unit/TestFrameFormatting.cs index b9fd80b004..c2d75f913d 100644 --- a/projects/Unit/TestFrameFormatting.cs +++ b/projects/Unit/TestFrameFormatting.cs @@ -114,7 +114,7 @@ public void MethodFrame() var method = new BasicPublish("E", "R", true, true); int payloadSize = method.GetRequiredBufferSize(); byte[] frameBytes = new byte[Impl.Framing.Method.FrameSize + payloadSize]; - Impl.Framing.Method.WriteTo(frameBytes, Channel, method); + Impl.Framing.Method.WriteTo(frameBytes, Channel, ref method); Assert.Equal(12, Impl.Framing.Method.FrameSize); Assert.Equal(Constants.FrameMethod, frameBytes[0]);