Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ string BasicConsume(
void BasicPublish(string exchange, string routingKey, bool mandatory,
IBasicProperties basicProperties, byte[] body);

[AmqpMethodDoNotImplement(null)]
void BasicBatchPublish(string exchange, string routingKey, bool mandatory,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IEnumerable is great multi purpose when you look at it from a caller perspective. One of the downsides it has it requires you to be aware of multiple enumerations, you cannot access things like count or length without using LINQ (allocs) and it requires the lib to copy around things into internal materialized data structures.

If the API is considered lower level I would lean towards using an array if possible but like I said IEnumerable offers more convenience from the caller perspective. Just some food for thought

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally OK if the implementation ensure that it only enumerates once. To do a count, it can enumerate into, e.g. a List<T>, and inspect the Count property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I see what you're saying, even a ToList() results in allocations internally. This is a valid concern for a hot code path.

IEnumerable<BatchMessage> messages);


/// <summary>
/// Configures QoS parameters of the Basic content-class.
/// </summary>
Expand Down Expand Up @@ -552,4 +557,8 @@ void QueueDeclareNoWait(string queue, bool durable,
/// </summary>
TimeSpan ContinuationTimeout { get; set; }
}
public class BatchMessage{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is confusing: "what is being batched there? how do I send a batch of messages using this thing?". It's an implementation detailed leaked in the public API (which I think can be avoided).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        List<BatchMessage> batchMessages = new List<BatchMessage>();
        foreach(var item in messages)
        {
            batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
        }

        Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);

Copy link
Contributor Author

@YulerB YulerB Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A larger view of the process:

            IBasicProperties Props;
            var message = System.Text.Encoding.UTF8.GetBytes("The quick brown fox jumps over the lazy dog!");
            var key = Guid.NewGuid().ToString();
            List<byte[]> messages = new List<byte[]>();
            for (int k = 0; k < 100000; k++)
            {
                messages.Add(message);
                if(k % 100 == 0)
                {
                    Props = Model?.CreateBasicProperties();
                    Props.ContentEncoding = Encoding.UTF8.BodyName;
                    Props.DeliveryMode = 2;
                    Props.CorrelationId = key;
                    List<BatchMessage> batchMessages = new List<BatchMessage>();
                    foreach (var item in messages)
                    {
                        batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
                    }

                    Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);
                    messages.Clear();
                }
            }
            if (messages.Count > 0)
            {
                Props = Model?.CreateBasicProperties();
                Props.ContentEncoding = Encoding.UTF8.BodyName;
                Props.DeliveryMode = 2;
                Props.CorrelationId = key;
                List<BatchMessage> batchMessages = new List<BatchMessage>();
                foreach (var item in messages)
                {
                    batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
                }

                Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);
            }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stores the variable data between publish invocations using the BasicPublish, which are the BasicProperties and the body of bytes.
The name I chose is BatchMessage; it’s a single message as part of the batch.
The batch is an IEnumerable that will be compiled into a single stream and sent on the socket.
It would be possible to change this to Batch, where we create another class that is simply
public class Batch : List<BatchMessage>{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As owner @michaelklishin of the source please consider this,

A way to avoid batching and gain the performance is to buffer outbound frames for sending on a seperate thread.

This however causes behavioural changes in the client, so I chose to write Batch functionality instead. Plus, this fits well for many of the use-cases for RabbitMQ.

Interestingly though, using the oubound frame buffer and thread for sending, you get benifits with Ack's being grouped and sent as a single stream on the socket too, which increases read throughput. To achieve this however, I took a dependancy on BufferBlock from TPL DataFlow.

I don't believe BufferStream is usefull for writes because it doesn't automatically flush periodically.
A new bepoke stream may though, with a one/two/x second timer to flush any remaining data.
But again, this may have similar behavioural issues.

public byte[] Body { get; set; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this thing immutable once created?

public IBasicProperties basicProperties { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public static void BasicPublish(this IModel model, string exchange, string routi
{
model.BasicPublish(exchange, routingKey, false, basicProperties, body);
}
public static void BasicBatchPublish(this IModel model, string exchange, string routingKey, IEnumerable<BatchMessage> messages)
{
model.BasicBatchPublish(exchange, routingKey, false, messages);
}

/// <summary>
/// (Spec method) Convenience overload of BasicPublish.
Expand Down
28 changes: 28 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection)
}
}



public void TransmitAsSingleFrame(int channelNumber, Connection connection)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
Expand All @@ -166,5 +168,31 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)

connection.WriteFrameSet(frames);
}


public static IList<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IEnumerable<Command> commands)
{
List<OutboundFrame> frames = new List<Impl.OutboundFrame>();

foreach (var cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.

frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
{
var remaining = body.Length - offset;
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
}
}
}
return frames;
}
}
}
4 changes: 3 additions & 1 deletion projects/client/RabbitMQ.Client/src/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
//---------------------------------------------------------------------------

using System;

using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
public interface ISession
Expand Down Expand Up @@ -79,5 +80,6 @@ public interface ISession
void HandleFrame(InboundFrame frame);
void Notify();
void Transmit(Command cmd);
void Transmit(IEnumerable<Command> cmd);
}
}
57 changes: 57 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,19 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
Session.Transmit(new Command(method, header, body));
}
}
public void ModelSend(MethodBase method, IEnumerable<BatchMessage> messages)
{
if (method.HasContent)
{
m_flowControlBlock.WaitOne();
}
List<Command> commands = new List<Impl.Command>();
foreach (var message in messages)
{
commands.Add(new Command(method, (ContentHeaderBase)message.basicProperties, message.Body));
}
Session.Transmit(commands);
}

public virtual void OnBasicAck(BasicAckEventArgs args)
{
Expand Down Expand Up @@ -1233,6 +1246,50 @@ public void BasicPublish(string exchange,
body);
}

public void BasicBatchPublish(string exchange,
string routingKey,
bool mandatory,
IEnumerable<BatchMessage> messages)
{
foreach (var message in messages)
{
if (message.basicProperties == null)
{
message.basicProperties = CreateBasicProperties();
}

if (NextPublishSeqNo > 0)
{
lock (m_unconfirmedSet.SyncRoot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it change anything if we would acquire all the sequence numbers in one go inside a lock instead of reacquiring the lock for each message?

{
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
{
m_unconfirmedSet.Add(NextPublishSeqNo);
}
NextPublishSeqNo++;
}
}
}

_Private_BasicBatchPublish(exchange,
routingKey,
mandatory,
messages);
}
public void _Private_BasicBatchPublish(
string @exchange,
string @routingKey,
bool @mandatory,
//bool @immediate,
IEnumerable<BatchMessage> messages)
{
BasicPublish __req = new BasicPublish();
__req.m_exchange = @exchange;
__req.m_routingKey = @routingKey;
__req.m_mandatory = @mandatory;
//__req.m_immediate = @immediate;
ModelSend(__req, messages);
}
public abstract void BasicQos(uint prefetchSize,
ushort prefetchCount,
bool global);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
using System;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;

using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
public abstract class SessionBase : ISession
Expand Down Expand Up @@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd)
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
cmd.Transmit(ChannelNumber, Connection);
}
public virtual void Transmit(IEnumerable<Command> commands)
{
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
}
}
}