diff --git a/projects/client/ApigenBootstrap/ApigenBootstrap.csproj b/projects/client/ApigenBootstrap/ApigenBootstrap.csproj index b957a3e900..ee56618a04 100755 --- a/projects/client/ApigenBootstrap/ApigenBootstrap.csproj +++ b/projects/client/ApigenBootstrap/ApigenBootstrap.csproj @@ -15,7 +15,7 @@ - + diff --git a/projects/client/RabbitMQ.Client/src/client/api/IBasicPublishBatch.cs b/projects/client/RabbitMQ.Client/src/client/api/IBasicPublishBatch.cs new file mode 100644 index 0000000000..9752bfe7c1 --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/api/IBasicPublishBatch.cs @@ -0,0 +1,47 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 1.1. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2016 Pivotal Software, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v1.1: +// +//--------------------------------------------------------------------------- +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +//--------------------------------------------------------------------------- +namespace RabbitMQ.Client +{ + public interface IBasicPublishBatch + { + void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body); + void Publish(); + } +} diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs index 6642057390..bccbd91fe5 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs @@ -40,6 +40,7 @@ using RabbitMQ.Client.Apigen.Attributes; using RabbitMQ.Client.Events; +using RabbitMQ.Client; using System; using System.Collections.Generic; @@ -278,6 +279,12 @@ void BasicPublish(string exchange, string routingKey, bool mandatory, [AmqpMethodDoNotImplement(null)] void ConfirmSelect(); + /// + /// Creates a BasicPublishBatch instance + /// + [AmqpMethodDoNotImplement(null)] + IBasicPublishBatch CreateBasicPublishBatch(); + /// /// Construct a completely empty content header for use with the Basic content class. /// diff --git a/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs b/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs index 2a481eb890..fe81e29a93 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs @@ -1212,5 +1212,10 @@ protected void RunRecoveryEventHandlers() } } } + + public IBasicPublishBatch CreateBasicPublishBatch() + { + return ((IFullModel)m_delegate).CreateBasicPublishBatch(); + } } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/BasicPublishBatch.cs b/projects/client/RabbitMQ.Client/src/client/impl/BasicPublishBatch.cs new file mode 100644 index 0000000000..4534885a9c --- /dev/null +++ b/projects/client/RabbitMQ.Client/src/client/impl/BasicPublishBatch.cs @@ -0,0 +1,75 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 1.1. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2016 Pivotal Software, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v1.1: +// +//--------------------------------------------------------------------------- +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +namespace RabbitMQ.Client.Impl +{ + using System.Collections.Generic; + using RabbitMQ.Client; + using RabbitMQ.Client.Framing.Impl; + using RabbitMQ.Client.Impl; + + public class BasicPublishBatch : IBasicPublishBatch + { + private List commands = new List(); + private ModelBase model; + internal BasicPublishBatch (ModelBase model) + { + this.model = model; + } + + public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body) + { + var bp = basicProperties == null ? model.CreateBasicProperties() : basicProperties; + var method = new BasicPublish + { + m_exchange = exchange, + m_routingKey = routingKey, + m_mandatory = mandatory + }; + + commands.Add(new Command(method, (ContentHeaderBase)bp, body)); + } + + public void Publish() + { + model.SendCommands(commands); + } + } +} \ No newline at end of file diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Command.cs b/projects/client/RabbitMQ.Client/src/client/impl/Command.cs index 799b6fa018..a81675b728 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Command.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Command.cs @@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection) } } + + public void TransmitAsSingleFrame(int channelNumber, Connection connection) { connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method)); @@ -166,5 +168,32 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection) connection.WriteFrameSet(frames); } + + + public static List CalculateFrames(int channelNumber, Connection connection, IList commands) + { + var frames = new List(); + + foreach (var cmd in commands) + { + frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method)); + if (cmd.Method.HasContent) + { + var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled. + + frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length)); + var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax); + var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize; + for (int offset = 0; offset < body.Length; offset += bodyPayloadMax) + { + var remaining = body.Length - offset; + var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; + frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count)); + } + } + } + + return frames; + } } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs index 3db607158b..d5045a43b4 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Connection.cs @@ -59,6 +59,7 @@ using System.Text; using System.Threading; +using System.Reflection; namespace RabbitMQ.Client.Framing.Impl { @@ -105,7 +106,16 @@ public class Connection : IConnection private Timer _heartbeatWriteTimer; private Timer _heartbeatReadTimer; private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false); - private AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false); + +#if CORECLR + private static string version = typeof(Connection).GetTypeInfo().Assembly + .GetCustomAttribute() + .InformationalVersion; +#else + private static string version = typeof(Connection).Assembly + .GetCustomAttribute() + .InformationalVersion; +#endif // true if we haven't finished connection negotiation. @@ -354,9 +364,6 @@ IProtocol IConnection.Protocol public static IDictionary DefaultClientProperties() { - - string version = "0.0.0.0";// assembly.GetName().Version.ToString(); - //TODO: Get the rest of this data from the Assembly Attributes IDictionary table = new Dictionary(); table["product"] = Encoding.UTF8.GetBytes("RabbitMQ"); table["version"] = Encoding.UTF8.GetBytes(version); @@ -537,7 +544,6 @@ public void FinishClose() { // Notify hearbeat loops that they can leave m_heartbeatRead.Set(); - m_heartbeatWrite.Set(); m_closed = true; MaybeStopHeartbeatTimers(); @@ -1170,13 +1176,11 @@ public override string ToString() public void WriteFrame(OutboundFrame f) { m_frameHandler.WriteFrame(f); - m_heartbeatWrite.Set(); } public void WriteFrameSet(IList f) { m_frameHandler.WriteFrameSet(f); - m_heartbeatWrite.Set(); } ///API-side invocation of connection abort. diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs b/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs index cc517e4e56..c0cc36ae0f 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs @@ -39,6 +39,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; namespace RabbitMQ.Client.Impl { @@ -79,5 +80,6 @@ public interface ISession void HandleFrame(InboundFrame frame); void Notify(); void Transmit(Command cmd); + void Transmit(IList cmd); } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 0b66a4cb55..4d3d6900e9 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -1205,6 +1205,26 @@ public abstract void BasicNack(ulong deliveryTag, bool multiple, bool requeue); + internal void AllocatatePublishSeqNos(int count) + { + var c = 0; + lock (m_unconfirmedSet.SyncRoot) + { + while(c < count) + { + if (NextPublishSeqNo > 0) + { + if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) + { + m_unconfirmedSet.Add(NextPublishSeqNo); + } + NextPublishSeqNo++; + } + c++; + } + } + } + public void BasicPublish(string exchange, string routingKey, bool mandatory, @@ -1273,6 +1293,10 @@ public void ConfirmSelect() /////////////////////////////////////////////////////////////////////////// public abstract IBasicProperties CreateBasicProperties(); + public IBasicPublishBatch CreateBasicPublishBatch() + { + return new BasicPublishBatch(this); + } public void ExchangeBind(string destination, @@ -1496,6 +1520,13 @@ public void WaitForConfirmsOrDie(TimeSpan timeout) } } + internal void SendCommands(IList commands) + { + m_flowControlBlock.WaitOne(); + AllocatatePublishSeqNos(commands.Count); + Session.Transmit(commands); + } + protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack) { lock (m_unconfirmedSet.SyncRoot) @@ -1534,6 +1565,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo return k.m_result; } + public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation { public IBasicConsumer m_consumer; diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs index 7b380137ea..d1634298e0 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs @@ -41,6 +41,7 @@ using System; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; +using System.Collections.Generic; namespace RabbitMQ.Client.Impl { @@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd) // of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock. cmd.Transmit(ChannelNumber, Connection); } + public virtual void Transmit(IList commands) + { + Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands)); + } } } diff --git a/projects/client/Unit/src/unit/TestBasicPublishBatch.cs b/projects/client/Unit/src/unit/TestBasicPublishBatch.cs new file mode 100755 index 0000000000..f779865b2c --- /dev/null +++ b/projects/client/Unit/src/unit/TestBasicPublishBatch.cs @@ -0,0 +1,67 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 1.1. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2016 Pivotal Software, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v1.1: +// +//--------------------------------------------------------------------------- +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using NUnit.Framework; +using RabbitMQ.Client; +using RabbitMQ.Client.Impl; +using System; + +namespace RabbitMQ.Client.Unit +{ + internal class TestBasicPublishBatch : IntegrationFixture + { + [Test] + public void TestBasicPublishBatchSend() + { + Model.ConfirmSelect(); + Model.QueueDeclare(queue: "test-message-batch-a", durable: false); + Model.QueueDeclare(queue: "test-message-batch-b", durable: false); + var batch = Model.CreateBasicPublishBatch(); + batch.Add("", "test-message-batch-a", false, null, new byte [] {}); + batch.Add("", "test-message-batch-b", false, null, new byte [] {}); + batch.Publish(); + Model.WaitForConfirmsOrDie(TimeSpan.FromSeconds(15)); + var resultA = Model.BasicGet("test-message-batch-a", true); + Assert.NotNull(resultA); + var resultB = Model.BasicGet("test-message-batch-b", true); + Assert.NotNull(resultB); + } + } +}