diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs index 6642057390..57c6f7d67a 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs @@ -226,6 +226,11 @@ string BasicConsume( void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body); + [AmqpMethodDoNotImplement(null)] + void BasicBatchPublish(string exchange, string routingKey, bool mandatory, + IEnumerable messages); + + /// /// Configures QoS parameters of the Basic content-class. /// @@ -552,4 +557,8 @@ void QueueDeclareNoWait(string queue, bool durable, /// TimeSpan ContinuationTimeout { get; set; } } + public class BatchMessage{ + public byte[] Body { get; set; } + public IBasicProperties basicProperties { get; set; } + } } diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs b/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs index 59afcc4919..d112f57b2e 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModelExtensions.cs @@ -103,6 +103,10 @@ public static void BasicPublish(this IModel model, string exchange, string routi { model.BasicPublish(exchange, routingKey, false, basicProperties, body); } + public static void BasicBatchPublish(this IModel model, string exchange, string routingKey, IEnumerable messages) + { + model.BasicBatchPublish(exchange, routingKey, false, messages); + } /// /// (Spec method) Convenience overload of BasicPublish. diff --git a/projects/client/RabbitMQ.Client/src/client/impl/Command.cs b/projects/client/RabbitMQ.Client/src/client/impl/Command.cs index 799b6fa018..5392077fcb 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/Command.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/Command.cs @@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection) } } + + public void TransmitAsSingleFrame(int channelNumber, Connection connection) { connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method)); @@ -166,5 +168,31 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection) connection.WriteFrameSet(frames); } + + + public static IList CalculateFrames(int channelNumber, Connection connection, IEnumerable commands) + { + List frames = new List(); + + foreach (var cmd in commands) + { + frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method)); + if (cmd.Method.HasContent) + { + var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled. + + frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length)); + var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax); + var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize; + for (int offset = 0; offset < body.Length; offset += bodyPayloadMax) + { + var remaining = body.Length - offset; + var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; + frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count)); + } + } + } + return frames; + } } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs b/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs index cc517e4e56..4392e92cef 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ISession.cs @@ -39,7 +39,8 @@ //--------------------------------------------------------------------------- using System; - +using System.Collections.Generic; + namespace RabbitMQ.Client.Impl { public interface ISession @@ -79,5 +80,6 @@ public interface ISession void HandleFrame(InboundFrame frame); void Notify(); void Transmit(Command cmd); + void Transmit(IEnumerable cmd); } } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 0b66a4cb55..1592ef2431 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -472,6 +472,19 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body) Session.Transmit(new Command(method, header, body)); } } + public void ModelSend(MethodBase method, IEnumerable messages) + { + if (method.HasContent) + { + m_flowControlBlock.WaitOne(); + } + List commands = new List(); + foreach (var message in messages) + { + commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body)); + } + Session.Transmit(commands); + } public virtual void OnBasicAck(BasicAckEventArgs args) { @@ -1233,6 +1246,50 @@ public void BasicPublish(string exchange, body); } + public void BasicBatchPublish(string exchange, + string routingKey, + bool mandatory, + IEnumerable messages) + { + foreach (var message in messages) + { + if (message.basicProperties == null) + { + message.basicProperties = CreateBasicProperties(); + } + + if (NextPublishSeqNo > 0) + { + lock (m_unconfirmedSet.SyncRoot) + { + if (!m_unconfirmedSet.Contains(NextPublishSeqNo)) + { + m_unconfirmedSet.Add(NextPublishSeqNo); + } + NextPublishSeqNo++; + } + } + } + + _Private_BasicBatchPublish(exchange, + routingKey, + mandatory, + messages); + } + public void _Private_BasicBatchPublish( +string @exchange, +string @routingKey, +bool @mandatory, +//bool @immediate, +IEnumerable messages) + { + BasicPublish __req = new BasicPublish(); + __req.m_exchange = @exchange; + __req.m_routingKey = @routingKey; + __req.m_mandatory = @mandatory; + //__req.m_immediate = @immediate; + ModelSend(__req, messages); + } public abstract void BasicQos(uint prefetchSize, ushort prefetchCount, bool global); diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs index 7b380137ea..0326d89a08 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SessionBase.cs @@ -41,7 +41,8 @@ using System; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; - +using System.Collections.Generic; + namespace RabbitMQ.Client.Impl { public abstract class SessionBase : ISession @@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd) // of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock. cmd.Transmit(ChannelNumber, Connection); } + public virtual void Transmit(IEnumerable commands) + { + Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands)); + } } }