Skip to content

Conversation

@YulerB
Copy link
Contributor

@YulerB YulerB commented Oct 16, 2017

Proposed Changes

Batch publish allows sending multiple messages in one stream on the socket.
Sending in batches improves performance by reducing the number of TCP/IP messages sent and TCP/IP acknowledgments received.
This change compiles the commands for all publish messages into a memory buffer that is posted to the socket as a single stream.
Closing the socket/model/connection before the buffer has completed sending does not guarantee message delivery.

Types of Changes

What types of changes does your code introduce to this project?

  • Bugfix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation (correction or otherwise)
  • Cosmetics (whitespace, appearance)

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

Batch publish allows sending multiple messages in one stream on the socket.
Sending in batches improves performance by reducing the number of TCP/IP messages sent and TCP/IP acknowledgments received.
This change compiles the commands for all publish messages into a memory buffer that is posted to the socket as a single stream.
Closing the socket/model/connection before the buffer has completed sending does not guarantee message delivery.
IBasicProperties basicProperties,
byte[] body);

//public abstract void _Private_BasicBatchPublish(string exchange,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do something about commented out code in pull requests.

}
}

_Private_BasicBatchPublish(exchange,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are produced by the ApiGen project, not added manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we all need a little help.
I don't know if this is considered Protocol specific or Client API specific.

}

public void BasicBatchPublish(string exchange,
string routingKey,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be GitHub diff formatting but indentation seems to be off here.

@michaelklishin
Copy link
Contributor

So these "batch messages" are meant to have shared metadata? How about introducing some tests that demonstrate the idea?

@michaelklishin
Copy link
Contributor

This PR does not compile because BatchMessage is not defined anywhere (wasn't added to the commit?):

src/client/impl/ModelBase.cs(475,62): error CS0246: The type or namespace name 'BatchMessage' could not be found (are you missing a using directive or an assembly reference?) [/Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj]
src/client/impl/ModelBase.cs(1258,17): error CS0246: The type or namespace name 'BatchMessage' could not be found (are you missing a using directive or an assembly reference?) [/Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj]
src/client/impl/ModelBase.cs(1290,13): error CS0246: The type or namespace name 'BatchMessage' could not be found (are you missing a using directive or an assembly reference?) [/Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj]

Addition of API items for BatchPublish
@YulerB
Copy link
Contributor Author

YulerB commented Oct 16, 2017

Added missing items

@YulerB
Copy link
Contributor Author

YulerB commented Oct 16, 2017

Batch Messages do not have shared meta-data. Only that they need to be sent to the same destination using the one model/channel.
I'm finding I have to do allot of loads of files and database tables and place on the bus for processing.
We find the performance of sending on the bus slow.
Using this method we have managed to increase the send rate x2.
While my local tests re-use the BasicProperties for all messages, its not a requirement.

Copy link
Contributor

@michaelklishin michaelklishin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ModelBase.cs has line endings for over 3000 lines modified.

/// </summary>
TimeSpan ContinuationTimeout { get; set; }
}
public class BatchMessage{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is confusing: "what is being batched there? how do I send a batch of messages using this thing?". It's an implementation detailed leaked in the public API (which I think can be avoided).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        List<BatchMessage> batchMessages = new List<BatchMessage>();
        foreach(var item in messages)
        {
            batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
        }

        Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);

Copy link
Contributor Author

@YulerB YulerB Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A larger view of the process:

            IBasicProperties Props;
            var message = System.Text.Encoding.UTF8.GetBytes("The quick brown fox jumps over the lazy dog!");
            var key = Guid.NewGuid().ToString();
            List<byte[]> messages = new List<byte[]>();
            for (int k = 0; k < 100000; k++)
            {
                messages.Add(message);
                if(k % 100 == 0)
                {
                    Props = Model?.CreateBasicProperties();
                    Props.ContentEncoding = Encoding.UTF8.BodyName;
                    Props.DeliveryMode = 2;
                    Props.CorrelationId = key;
                    List<BatchMessage> batchMessages = new List<BatchMessage>();
                    foreach (var item in messages)
                    {
                        batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
                    }

                    Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);
                    messages.Clear();
                }
            }
            if (messages.Count > 0)
            {
                Props = Model?.CreateBasicProperties();
                Props.ContentEncoding = Encoding.UTF8.BodyName;
                Props.DeliveryMode = 2;
                Props.CorrelationId = key;
                List<BatchMessage> batchMessages = new List<BatchMessage>();
                foreach (var item in messages)
                {
                    batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
                }

                Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);
            }

/// </summary>
TimeSpan ContinuationTimeout { get; set; }
}
public class BatchMessage{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stores the variable data between publish invocations using the BasicPublish, which are the BasicProperties and the body of bytes.
The name I chose is BatchMessage; it’s a single message as part of the batch.
The batch is an IEnumerable that will be compiled into a single stream and sent on the socket.
It would be possible to change this to Batch, where we create another class that is simply
public class Batch : List<BatchMessage>{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As owner @michaelklishin of the source please consider this,

A way to avoid batching and gain the performance is to buffer outbound frames for sending on a seperate thread.

This however causes behavioural changes in the client, so I chose to write Batch functionality instead. Plus, this fits well for many of the use-cases for RabbitMQ.

Interestingly though, using the oubound frame buffer and thread for sending, you get benifits with Ack's being grouped and sent as a single stream on the socket too, which increases read throughput. To achieve this however, I took a dependancy on BufferBlock from TPL DataFlow.

I don't believe BufferStream is usefull for writes because it doesn't automatically flush periodically.
A new bepoke stream may though, with a one/two/x second timer to flush any remaining data.
But again, this may have similar behavioural issues.

@michaelklishin
Copy link
Contributor

The feature overall is reasonable (and was suggested before). Are there any benchmarks that demonstrate the difference?

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client.Events;
using System.Threading.Tasks;
using System.Linq;
using System.Threading;

namespace RabbitMQTests
{
    [TestClass]
    public class TestBatchPublish
    {
        static readonly byte[] message = System.Text.Encoding.UTF8.GetBytes("The quick brown fox jumps over the lazy dog!");
        static readonly string key = Guid.NewGuid().ToString();
        static readonly string mimeType = "text/plain";
        Publisher1 publisher = new Publisher1();
        Publisher1 publisher1= new Publisher1();
        public void TestRabbitMQSendBatch()
        {
            publisher1.Init("localhost", -1, 30, "Baldur.Exchange", "BasicBatchPublishTestQueue");//.Setup();

            List<byte[]> messages = new List<byte[]>();
            while (true)
            {
                messages.Add(message);
                if (messages.Count % 25000 == 0)
                {
                    publisher1.Publish(key, messages, mimeType);
                    messages.Clear();
                }
            }
        }
        public void TestRabbitMQSend()
        {
                publisher.Init("localhost", -1, 30, "Baldur.Exchange", "BasicPublishTestQueue");//.Setup();

                while(true)
                {
                     publisher.Publish(key, message, mimeType);
                }
        }
        [TestMethod]
        public void TestBoth()
        {
            Task.WaitAll(new Task[] { Task.Run(() => { TestRabbitMQSend(); }), Task.Run(() => { TestRabbitMQSendBatch(); }) }, TimeSpan.FromSeconds(30));
            QueueCounter q1 = new QueueCounter();
            q1.Init("localhost", -1, 30, "Baldur.Exchange", "BasicBatchPublishTestQueue");//.Setup();
            QueueCounter q2 = new QueueCounter();
            q2.Init("localhost", -1, 30, "Baldur.Exchange", "BasicPublishTestQueue");//.Setup();
            Console.WriteLine($"Batch: {q1.MessageCount}");
            Console.WriteLine($"Single: {q2.MessageCount}");
            Assert.IsTrue(q1.MessageCount > q2.MessageCount);
        }
        public class Publisher1 : IDisposable
        {
            private static readonly Dictionary<string, object> clientProperties = new Dictionary<string, object>
                {
                    { "ComputerName", Environment.MachineName },
                    { "Is64BitProcess", Environment.Is64BitProcess },
                    { "Is64BitOperatingSystem", Environment.Is64BitOperatingSystem },
                    { "OSVersion", Environment.OSVersion.ToString() },
                    { "SystemPageSize", Environment.SystemPageSize },
                    { "UserDomainName", Environment.UserDomainName },
                    { "UserName", Environment.UserName },
                    { "CLR_Version", Environment.Version.ToString() },
                    { "ProcessorCount", Environment.ProcessorCount },
                    { "WorkingSet", Environment.WorkingSet }
        };
            private IConnection Connection;
            private ConnectionFactory Factory;
            private IModel Model;
            private string exchangeName;
            private string routingKey;

            public void Init(string hosts, int port, int heartbeatInterval,// Dictionary<string, object> clientProperties,
                string exchange, string queue
                )
            {
                routingKey = queue;
                exchangeName = exchange;
                Factory = new RabbitMQ.Client.ConnectionFactory
                {
                    HostName = hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries).First().Trim(),
                    Port = port,
                    AutomaticRecoveryEnabled = false,
                    RequestedConnectionTimeout = 45000,
                    TopologyRecoveryEnabled = false,
                    UseBackgroundThreadsForIO = true,
                    DispatchConsumersAsync = false,
                    VirtualHost = "/",
                    NetworkRecoveryInterval = TimeSpan.FromSeconds(20),
                    Protocol = Protocols.DefaultProtocol,
                    RequestedHeartbeat = heartbeatInterval < 10 ? Convert.ToUInt16(30) : Convert.ToUInt16(heartbeatInterval),
                    ClientProperties = clientProperties,
                    UserName = "guest",
                    Password = "guest"
                };


                Connection = Factory.CreateConnection(hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
                Model = Connection.CreateModel();

                const string deadLetterSuffix = ".DeadLetter";
                const string deadLetterExchange = "Baldur.DeadLetter";
                Model.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null);
                Model.ExchangeDeclare(String.Concat(exchange, deadLetterSuffix), ExchangeType.Direct, true, false, null);
                Model.QueueDeclareNoWait(deadLetterExchange, true, false, false, null);
                Model.QueueBind(deadLetterExchange, String.Concat(exchange, deadLetterSuffix), queue, null);

                Dictionary<string, object> args = new Dictionary<string, object>();

                args[RabbitMQ.Client.Headers.XMaxPriority] = 10;
                args[RabbitMQ.Client.Headers.XDeadLetterExchange] = String.Concat(exchange, deadLetterSuffix);
                args[RabbitMQ.Client.Headers.XDeadLetterRoutingKey] = queue;

                try
                {
                    Model.QueueDeclareNoWait(queue, true, false, false, args);
                }
                catch (Exception ex)
                {
                    const string messagePrefix = "RabbitQueuePublisher.Setup():";
                    Console.WriteLine(String.Concat(messagePrefix, ex.Message));
                }

                Model.QueueBind(queue, exchange, queue, null);
            }

            const byte PERSISTENT = 2;
            const string jsonMimeType = "application/json";
            IBasicProperties Props;
            public void Publish(string correlationId, byte[] message, string mimeType)
            {
                if (Props == null)
                {
                    Props = Model?.CreateBasicProperties();
                    Props.ContentType = mimeType;
                    if (mimeType == jsonMimeType)
                        Props.ContentEncoding = Encoding.UTF8.BodyName;
                    Props.DeliveryMode = PERSISTENT;
                    Props.MessageId = Guid.NewGuid().ToString();
                    Props.CorrelationId = correlationId;
                }

                Model?.BasicPublish(
                    exchangeName,
                    routingKey,
                    Props,
                    message
                );
            }
            public void Publish(string correlationId, IEnumerable<byte[]> messages, string mimeType)
            {
                if (Props == null)
                {
                }

                Props = Model?.CreateBasicProperties();
                Props.ContentType = mimeType;
                Props.ContentEncoding = Encoding.UTF8.BodyName;
                Props.DeliveryMode = PERSISTENT;
                Props.CorrelationId = correlationId;
                List<BatchMessage> batchMessages = new List<BatchMessage>();
                foreach (var item in messages)
                {
                    batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
                }

                Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);
            }

            #region IDisposable Support
            private bool disposedValue = false; // To detect redundant calls

            protected virtual void Dispose(bool disposing)
            {
                if (!disposedValue)
                {
                    if (disposing)
                    {
                        // TODO: dispose managed state (managed objects).
                        this.Model.Close();
                        this.Connection.Close();
                    }
                    this.Model = null;
                    this.Connection = null;
                    this.Factory = null;
                    disposedValue = true;
                }
            }

            // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
            // ~Publisher() {
            //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
            //   Dispose(false);
            // }

            // This code added to correctly implement the disposable pattern.
            public void Dispose()
            {
                // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
                Dispose(true);
                // TODO: uncomment the following line if the finalizer is overridden above.
                // GC.SuppressFinalize(this);
            }
            #endregion
        }
        public class QueueCounter : IDisposable
        {
            private static readonly Dictionary<string, object> clientProperties = new Dictionary<string, object>
                {
                    { "ComputerName", Environment.MachineName },
                    { "Is64BitProcess", Environment.Is64BitProcess },
                    { "Is64BitOperatingSystem", Environment.Is64BitOperatingSystem },
                    { "OSVersion", Environment.OSVersion.ToString() },
                    { "SystemPageSize", Environment.SystemPageSize },
                    { "UserDomainName", Environment.UserDomainName },
                    { "UserName", Environment.UserName },
                    { "CLR_Version", Environment.Version.ToString() },
                    { "ProcessorCount", Environment.ProcessorCount },
                    { "WorkingSet", Environment.WorkingSet }
        };
            private IConnection Connection;
            private ConnectionFactory Factory;
            private IModel Model;
            private string exchangeName;
            private string routingKey;

            public void Init(string hosts, int port, int heartbeatInterval,// Dictionary<string, object> clientProperties,
                string exchange, string queue
                )
            {
                routingKey = queue;
                exchangeName = exchange;
                Factory = new RabbitMQ.Client.ConnectionFactory
                {
                    HostName = hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries).First().Trim(),
                    Port = port,
                    AutomaticRecoveryEnabled = false,
                    RequestedConnectionTimeout = 45000,
                    TopologyRecoveryEnabled = false,
                    UseBackgroundThreadsForIO = true,
                    DispatchConsumersAsync = false,
                    VirtualHost = "/",
                    NetworkRecoveryInterval = TimeSpan.FromSeconds(20),
                    Protocol = Protocols.DefaultProtocol,
                    RequestedHeartbeat = heartbeatInterval < 10 ? Convert.ToUInt16(30) : Convert.ToUInt16(heartbeatInterval),
                    ClientProperties = clientProperties,
                    UserName = "guest",
                    Password = "guest"
                };


                Connection = Factory.CreateConnection(hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
                Model = Connection.CreateModel();

                const string deadLetterSuffix = ".DeadLetter";
                const string deadLetterExchange = "Baldur.DeadLetter";
                Model.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null);
                Model.ExchangeDeclare(String.Concat(exchange, deadLetterSuffix), ExchangeType.Direct, true, false, null);
                Model.QueueDeclareNoWait(deadLetterExchange, true, false, false, null);
                Model.QueueBind(deadLetterExchange, String.Concat(exchange, deadLetterSuffix), queue, null);

                Dictionary<string, object> args = new Dictionary<string, object>();

                args[RabbitMQ.Client.Headers.XMaxPriority] = 10;
                args[RabbitMQ.Client.Headers.XDeadLetterExchange] = String.Concat(exchange, deadLetterSuffix);
                args[RabbitMQ.Client.Headers.XDeadLetterRoutingKey] = queue;

                try
                {
                   var qdo = Model.QueueDeclare(queue, true, false, false, args);

                    MessageCount  = qdo.MessageCount;
                }
                catch (Exception ex)
                {
                    const string messagePrefix = "RabbitQueuePublisher.Setup():";
                    Console.WriteLine(String.Concat(messagePrefix, ex.Message));
                }
                Model.QueueDeleteNoWait(queue);
            }

            public uint MessageCount
            {
                get;
                private set;
            }


            #region IDisposable Support
            private bool disposedValue = false; // To detect redundant calls

            protected virtual void Dispose(bool disposing)
            {
                if (!disposedValue)
                {
                    if (disposing)
                    {
                        // TODO: dispose managed state (managed objects).
                        this.Model.Close();
                        this.Connection.Close();
                    }
                    this.Model = null;
                    this.Connection = null;
                    this.Factory = null;
                    disposedValue = true;
                }
            }

            // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
            // ~Publisher() {
            //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
            //   Dispose(false);
            // }

            // This code added to correctly implement the disposable pattern.
            public void Dispose()
            {
                // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
                Dispose(true);
                // TODO: uncomment the following line if the finalizer is overridden above.
                // GC.SuppressFinalize(this);
            }
            #endregion
        }

    }
}

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

This holds some promise for further research also.

        public class NetworkBufferStreamWriter : Stream
    {
        private readonly Timer timer;
        private readonly ConcurrentQueue<ArraySegment<byte>> buffers = new ConcurrentQueue<ArraySegment<byte>>();
        private readonly Stream input;
        private readonly MemoryStream stream = new MemoryStream();
        private const int interval = 750;
        private ArraySegment<byte> item = default(ArraySegment<byte>);
        public NetworkBufferStreamWriter(Stream stream)
        {
            this.input = stream;
            timer = new Timer(new TimerCallback(timer_callback), null, TimeSpan.FromMilliseconds(interval), Timeout.InfiniteTimeSpan);
        }
        public void timer_callback(object state)
        {
            stream.Position = 0;
            int i = 0;
            while (buffers.TryDequeue(out item))
            {
                stream.Write(item.Array, item.Offset, item.Count);
                i++;
            }
            Console.WriteLine($"Frames {i} of size {stream.Position}" );
            if (stream.Position > 0) input.Write(stream.GetBuffer(), 0, (int)stream.Position);
            timer.Change(TimeSpan.FromMilliseconds(interval), Timeout.InfiniteTimeSpan);
        }
        public override bool CanRead
        {
            get
            {
                return false;
            }
        }

        public override bool CanSeek
        {
            get
            {
                return false;
            }
        }

        public override bool CanWrite
        {
            get
            {
                return input.CanWrite;
            }
        }

        public override long Length
        {
            get
            {
                long sum = 0;
                foreach (var item in buffers) sum += item.Count;
                return sum;
            }
        }

        public override long Position
        {
            get;
            set;
        }

        public override void Flush()
        {
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new InvalidOperationException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new InvalidOperationException();
        }

        public override void SetLength(long value)
        {
            throw new InvalidOperationException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            //if (started)
            buffers.Enqueue(new ArraySegment<byte>(buffer, offset, count));
            //else
            //    input.Write(buffer, offset, count);
        }
    }

@michaelklishin michaelklishin changed the title Batch Publish More efficient way of publishing of batches of messages Oct 17, 2017
@michaelklishin
Copy link
Contributor

@YulerB thank you. Are there any benchmark run results you can share from your tests?

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

Run the test, the console outputs the numbers.
Run the test multiple times, you will see the numbers are consistent.

@michaelklishin
Copy link
Contributor

@YulerB my point was that some folks looking at this PR would appreciate some numbers without having to compile and run the benchmark on their own.

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

Test Name: TestBoth
Test Outcome: Passed
Result StandardOutput:
Batch: 65538
Single: 16062

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

Test Name: TestBoth
Test Outcome: Passed
Result StandardOutput:
Batch: 95133
Single: 21074

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

Test Name: TestBoth
Test Outcome: Passed
Result StandardOutput:
Batch: 89098
Single: 12805

@YulerB
Copy link
Contributor Author

YulerB commented Oct 17, 2017

Test is time boxed to 30 seconds, for two connections, one doing basicpublish, and the other doing basicbatchpublish to seperate queues.
After 30 seconds we test the message counts for each queue and print to the console.
We also assert if the basicbatchpublish delivered more messages than basicpublish.

@YulerB
Copy link
Contributor Author

YulerB commented Oct 19, 2017

I did some more research using a buffered write stream with an auto-flush every 750ms, and while it helps, its not as fast as batching.

@michaelklishin
Copy link
Contributor

@kjnilsson @adamralph @danielmarbach @bording @lukebakken this is a reasonable feature and the results seem quite promising. WDYT?

IBasicProperties basicProperties, byte[] body);

[AmqpMethodDoNotImplement(null)]
void BasicBatchPublish(string exchange, string routingKey, bool mandatory,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IEnumerable is great multi purpose when you look at it from a caller perspective. One of the downsides it has it requires you to be aware of multiple enumerations, you cannot access things like count or length without using LINQ (allocs) and it requires the lib to copy around things into internal materialized data structures.

If the API is considered lower level I would lean towards using an array if possible but like I said IEnumerable offers more convenience from the caller perspective. Just some food for thought

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally OK if the implementation ensure that it only enumerates once. To do a count, it can enumerate into, e.g. a List<T>, and inspect the Count property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I see what you're saying, even a ToList() results in allocations internally. This is a valid concern for a hot code path.

TimeSpan ContinuationTimeout { get; set; }
}
public class BatchMessage{
public byte[] Body { get; set; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this thing immutable once created?


if (NextPublishSeqNo > 0)
{
lock (m_unconfirmedSet.SyncRoot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it change anything if we would acquire all the sequence numbers in one go inside a lock instead of reacquiring the lock for each message?

@danielmarbach
Copy link
Collaborator

One of the good things about this is that with v6 NServiceBus introduced batched sends. So what this would bring is that things that need to be routed to the same destination can be batched together in one call so from a feature perspective this sounds nice but I cannot speak for @bording since I'm not part of the Rabbit MQ maintainer group at Particular (just a curious guy observing from the sidelines)

@YulerB
Copy link
Contributor Author

YulerB commented Nov 3, 2017

With relation to receive, sometimes the work pool has multiple messages, is it possible to change to raising an event with all the messages currently in the queue, or at least a few, this would maintain ordering, but would allow batch receive.
Many of the operations we perform using our message handlers can be performed in batch, for example updating/inserting records in a database, calling rest API's which could support batch also.

@kjnilsson
Copy link
Contributor

Yes this looks promising but there is a bit of work to be done before it can be merged. @YulerB are you planning to add anything else? At the very least we need to fix formatting and add a unit test to at least provide some coverage of this new api. If you wish I can create a new branch with your commits and finish this up?

@YulerB
Copy link
Contributor Author

YulerB commented Nov 3, 2017

No, I think this is a large enough change for the moment.

Problem is, I want to re-architect the client and server code too.

For example:

  • Batch Publish (this PR)
  • QOS would serve two purposes, one for total messages, and another for largest batch from server. (Server would send multiple frames, up to QOS size in one socket stream).
  • Support a new consumer that events into client all the messages currently received.
  • Batch Acknowledge.
  • Change the socket read code to use a task that will read the messages/parse and event into the parent.
    Code should spend less time blocked, if receiving batches from server. Also the more items we accumulate on the client before the event is raised, the higher the batch size the client code could support.

@michaelklishin
Copy link
Contributor

While some ways to implement "batched writes" for deliveries would be considered, QoS is already an overloaded setting and it's very unlikely that we will extend its semantics even more. Batched writes will likely require a protocol extension.

Let's focus on what's in scope for this PR.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 3, 2017

@kjnilsson , if your happy to help get this across the line, thank you.

@bording
Copy link
Collaborator

bording commented Nov 5, 2017

Overall being able to put more than one frame into a single tcp packet when there is room seems like a good optimization. I agree with @danielmarbach around the usage of the collection interfaces, because enumerating them causes an allocation, but that's already an existing problem in the codebase.

Looking at the current shape of the proposed new API, I'm not sure I'd be able to use it with NServiceBus, though. The restriction on all of the messages needing to have the same exchange and routing key negates a lot of the potential. We're far more likely to have "batches" of messages needing to go to different destinations.

Given that this is just allowing multiple messages to be published with a single API call and not actually extending AMQP to have the concept of a batch send, it complicates the handling of publisher confirm acks as well.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 6, 2017

@bording , it should be possible to change it to work as suggested, to send to any exchange, routing key.
The only use case we had was for a single destination. Do you have any blogs about your routing strategies/use case?

@kjnilsson
Copy link
Contributor

@YulerB did you run your tests against anything else than localhost?

I am doing some work on this today and I intend to address some of the API issues above but first I'd like to look into the significant performance difference shown in your test results above as I feel it is more significant than expected. If your tests were run against the loopback adapter it is likely that the MTU size of the loopback adapter is significantly larger than it would be when the transport link is e.g. ethernet ( on my system the MTU is 16384 where ethernet is 1500). Your test also uses a relatively small message body (~40 bytes) which means the total size of the AMQP message is ~92 bytes which fits many times into an ethernet frame even after TCP/IP overhead. With a larger message body I'd expect the difference to be smaller. I will be running a few of these tests around this.

I've done a bit of testing with re-enabling Nagel's algorithm but it doesn't seem to make much difference with the current code so it does look like application level batching is the way to go.

@danielmarbach
Copy link
Collaborator

@YulerB maybe off topic for this PR but still I would be curious to understand the motivation behind these changes. So far based on what we've seen with interacting with customers in the messaging world many customers think they need really high throughput in the messaging layer and start doubting that layer first when almost all the time it is the actual code that is running inside the handler of the message that is the slowest part in their system. I would be curious to understand what non-functional requirements in your architecture make you go on this path.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 6, 2017

To be fair, this one is hard to quantify.

Unit test data size is enough to illustrate benefit against localhost.

We see similar benefits in running application, it usually depends on the variables of the job.

The significant variables are data size, batch size and bandwidth.

TCP scaling will also impact the results too, however, larger TCP segments should see better scaling.

For a real world test data size should be increased. We see tables/files with about 10-15 columns, encoded in JSON.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 7, 2017

Here is another test that illustrates the benefit when testing against servers.
`
[TestClass]
public class TestBatchPublish
{
static readonly byte[] message = System.Text.Encoding.UTF8.GetBytes(@"The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!
The quick brown fox jumps over the lazy dog!");
static readonly string key = Guid.NewGuid().ToString();
static readonly string mimeType = "text/plain";
Publisher1 publisher = new Publisher1();
Publisher1 publisher1 = new Publisher1();
List<byte[]> messages = new List<byte[]>();
[TestMethod]
public void TestRabbitMQSendBatch()
{
for (int i = 0; i < 100000; i++)
{
messages.Add(message);
}
CancellationTokenSource cts = new CancellationTokenSource();
var tasks = new Task[] { Task.Run(() => { TestRabbitMQSendBatch(cts.Token); }) };
Thread.Sleep(30000);
cts.Cancel();
Task.WaitAll(tasks);

        QueueCounter q2 = new QueueCounter();
        q2.Init("<server>", -1, 30, "Baldur.Exchange", "BasicBatchPublishTestQueue", "<username>", "<password>");//.Setup();
        Console.WriteLine($"Batch: {q2.MessageCount}");
    }
    public void TestRabbitMQSendBatch(CancellationToken ct)
    {
        publisher1.Init("<server>", -1, 30, "Baldur.Exchange", "BasicBatchPublishTestQueue", "<username>", "<password>");//.Setup();

        int i = 0;
        const int batchSize = 10000;
        while (!ct.IsCancellationRequested)
        {
            publisher1.BatchPublish(key, messages.GetRange(i++ * batchSize, batchSize), mimeType);
        }
    }
    [TestMethod]
    public void TestRabbitMQSend()
    {
        for (int i = 0; i < 100000; i++)
        {
            messages.Add(message);
        }
        CancellationTokenSource cts = new CancellationTokenSource();
        var tasks = new Task[] { Task.Run(() => { TestRabbitMQSend(cts.Token); }) };
        Thread.Sleep(30000);
        cts.Cancel();
        Task.WaitAll(tasks);

        QueueCounter q2 = new QueueCounter();
        q2.Init("<server>", -1, 30, "Baldur.Exchange", "BasicPublishTestQueue", "<username>", "<password>");//.Setup();
        Console.WriteLine($"Single: {q2.MessageCount}");
    }
    public void TestRabbitMQSend(CancellationToken ct)
    {
        publisher.Init("<server>", -1, 30, "Baldur.Exchange", "BasicPublishTestQueue", "<username>", "<password>");//.Setup();

        int i = 0;
        while (!ct.IsCancellationRequested && i < messages.Count) 
        {
            publisher.Publish(key, messages[i++], mimeType);
        }
    }
    [TestMethod]
    public void TestBoth()
    {
        for (int i = 0; i < 100000; i++)
        {
            messages.Add(message);
        }

        CancellationTokenSource cts = new CancellationTokenSource();
        var tasks = new Task[] { Task.Run(() => { TestRabbitMQSend(cts.Token); }), Task.Run(() => { TestRabbitMQSendBatch(cts.Token); }) };
        Thread.Sleep(30000);
        cts.Cancel();
        Task.WaitAll(tasks);

        QueueCounter q1 = new QueueCounter();
        q1.Init("<server>", -1, 30, "Baldur.Exchange", "BasicBatchPublishTestQueue", "<username>", "<password>");//.Setup();
        QueueCounter q2 = new QueueCounter();
        q2.Init("<server>", -1, 30, "Baldur.Exchange", "BasicPublishTestQueue","<username>", "<password>");//.Setup();
        Console.WriteLine($"Batch: {q1.MessageCount}");
        Console.WriteLine($"Single: {q2.MessageCount}");
        Assert.IsTrue(q1.MessageCount > q2.MessageCount);
    }
    public class Publisher1 : IDisposable
    {
        private static readonly Dictionary<string, object> clientProperties = new Dictionary<string, object>
            {
                { "ComputerName", Environment.MachineName },
                { "Is64BitProcess", Environment.Is64BitProcess },
                { "Is64BitOperatingSystem", Environment.Is64BitOperatingSystem },
                { "OSVersion", Environment.OSVersion.ToString() },
                { "SystemPageSize", Environment.SystemPageSize },
                { "UserDomainName", Environment.UserDomainName },
                { "UserName", Environment.UserName },
                { "CLR_Version", Environment.Version.ToString() },
                { "ProcessorCount", Environment.ProcessorCount },
                { "WorkingSet", Environment.WorkingSet }
    };
        private IConnection Connection;
        private ConnectionFactory Factory;
        private IModel Model;
        private string exchangeName;
        private string routingKey;

        public void Init(string hosts, int port, int heartbeatInterval,// Dictionary<string, object> clientProperties,
            string exchange, string queue, string userName, string password
            )
        {
            routingKey = queue;
            exchangeName = exchange;
            Factory = new RabbitMQ.Client.ConnectionFactory
            {
                HostName = hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries).First().Trim(),
                Port = port,
                AutomaticRecoveryEnabled = false,
                RequestedConnectionTimeout = 45000,
                TopologyRecoveryEnabled = false,
                UseBackgroundThreadsForIO = true,
                DispatchConsumersAsync = false,
                VirtualHost = "/",
                NetworkRecoveryInterval = TimeSpan.FromSeconds(20),
                Protocol = Protocols.DefaultProtocol,
                RequestedHeartbeat = heartbeatInterval < 10 ? Convert.ToUInt16(30) : Convert.ToUInt16(heartbeatInterval),
                ClientProperties = clientProperties,
                UserName = userName,
                Password = password
            };


            Connection = Factory.CreateConnection(hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
            Model = Connection.CreateModel();

            const string deadLetterSuffix = ".DeadLetter";
            const string deadLetterExchange = "Baldur.DeadLetter";
            Model.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null);
            Model.ExchangeDeclare(String.Concat(exchange, deadLetterSuffix), ExchangeType.Direct, true, false, null);
            Model.QueueDeclareNoWait(deadLetterExchange, true, false, false, null);
            Model.QueueBind(deadLetterExchange, String.Concat(exchange, deadLetterSuffix), queue, null);

            Dictionary<string, object> args = new Dictionary<string, object>();

            args[RabbitMQ.Client.Headers.XMaxPriority] = 10;
            args[RabbitMQ.Client.Headers.XDeadLetterExchange] = String.Concat(exchange, deadLetterSuffix);
            args[RabbitMQ.Client.Headers.XDeadLetterRoutingKey] = queue;

            try
            {
                Model.QueueDeclareNoWait(queue, true, false, false, args);
            }
            catch (Exception ex)
            {
                const string messagePrefix = "RabbitQueuePublisher.Setup():";
                Console.WriteLine(String.Concat(messagePrefix, ex.Message));
            }

            Model.QueueBind(queue, exchange, queue, null);
        }

        const byte PERSISTENT = 2;
        const string jsonMimeType = "application/json";
        IBasicProperties Props;
        public void Publish(string correlationId, byte[] message, string mimeType)
        {
            if (Props == null)
            {
                Props = Model?.CreateBasicProperties();
                Props.ContentType = mimeType;
                if (mimeType == jsonMimeType)
                    Props.ContentEncoding = Encoding.UTF8.BodyName;
                Props.DeliveryMode = PERSISTENT;
                Props.MessageId = Guid.NewGuid().ToString();
                Props.CorrelationId = correlationId;
            }

            Model?.BasicPublish(
                exchangeName,
                routingKey,
                Props,
                message
            );
        }
        public void BatchPublish(string correlationId, IEnumerable<byte[]> messages, string mimeType)
        {
            if (Props == null)
            {
            }

            Props = Model?.CreateBasicProperties();
            Props.ContentType = mimeType;
            Props.ContentEncoding = Encoding.UTF8.BodyName;
            Props.DeliveryMode = PERSISTENT;
            Props.CorrelationId = correlationId;
            List<BatchMessage> batchMessages = new List<BatchMessage>();
            foreach (var item in messages)
            {
                batchMessages.Add(new BatchMessage { basicProperties = Props, Body = item });
            }

            Model?.BasicBatchPublish(exchangeName, routingKey, batchMessages);
        }

        #region IDisposable Support
        private bool disposedValue = false; // To detect redundant calls

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    // TODO: dispose managed state (managed objects).
                    this.Model.Close();
                    this.Connection.Close();
                }
                this.Model = null;
                this.Connection = null;
                this.Factory = null;
                disposedValue = true;
            }
        }

        // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
        // ~Publisher() {
        //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
        //   Dispose(false);
        // }

        // This code added to correctly implement the disposable pattern.
        public void Dispose()
        {
            // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
            Dispose(true);
            // TODO: uncomment the following line if the finalizer is overridden above.
            // GC.SuppressFinalize(this);
        }
        #endregion
    }
    public class QueueCounter : IDisposable
    {
        private static readonly Dictionary<string, object> clientProperties = new Dictionary<string, object>
            {
                { "ComputerName", Environment.MachineName },
                { "Is64BitProcess", Environment.Is64BitProcess },
                { "Is64BitOperatingSystem", Environment.Is64BitOperatingSystem },
                { "OSVersion", Environment.OSVersion.ToString() },
                { "SystemPageSize", Environment.SystemPageSize },
                { "UserDomainName", Environment.UserDomainName },
                { "UserName", Environment.UserName },
                { "CLR_Version", Environment.Version.ToString() },
                { "ProcessorCount", Environment.ProcessorCount },
                { "WorkingSet", Environment.WorkingSet }
    };
        private IConnection Connection;
        private ConnectionFactory Factory;
        private IModel Model;
        private string exchangeName;
        private string routingKey;

        public void Init(string hosts, int port, int heartbeatInterval,// Dictionary<string, object> clientProperties,
            string exchange, string queue, string userName, string password
            )
        {
            routingKey = queue;
            exchangeName = exchange;
            Factory = new RabbitMQ.Client.ConnectionFactory
            {
                HostName = hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries).First().Trim(),
                Port = port,
                AutomaticRecoveryEnabled = false,
                RequestedConnectionTimeout = 45000,
                TopologyRecoveryEnabled = false,
                UseBackgroundThreadsForIO = true,
                DispatchConsumersAsync = false,
                VirtualHost = "/",
                NetworkRecoveryInterval = TimeSpan.FromSeconds(20),
                Protocol = Protocols.DefaultProtocol,
                RequestedHeartbeat = heartbeatInterval < 10 ? Convert.ToUInt16(30) : Convert.ToUInt16(heartbeatInterval),
                ClientProperties = clientProperties,
                UserName = userName,
                Password = password
            };

            Connection = Factory.CreateConnection(hosts.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
            Model = Connection.CreateModel();

            const string deadLetterSuffix = ".DeadLetter";
            const string deadLetterExchange = "Baldur.DeadLetter";
            Model.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null);
            Model.ExchangeDeclare(String.Concat(exchange, deadLetterSuffix), ExchangeType.Direct, true, false, null);
            Model.QueueDeclareNoWait(deadLetterExchange, true, false, false, null);
            Model.QueueBind(deadLetterExchange, String.Concat(exchange, deadLetterSuffix), queue, null);

            Dictionary<string, object> args = new Dictionary<string, object>();

            args[RabbitMQ.Client.Headers.XMaxPriority] = 10;
            args[RabbitMQ.Client.Headers.XDeadLetterExchange] = String.Concat(exchange, deadLetterSuffix);
            args[RabbitMQ.Client.Headers.XDeadLetterRoutingKey] = queue;

            try
            {
                var qdo = Model.QueueDeclare(queue, true, false, false, args);

                MessageCount = qdo.MessageCount;
            }
            catch (Exception ex)
            {
                const string messagePrefix = "RabbitQueuePublisher.Setup():";
                Console.WriteLine(String.Concat(messagePrefix, ex.Message));
            }
            Model.QueueDeleteNoWait(queue);
        }

        public uint MessageCount
        {
            get;
            private set;
        }


        #region IDisposable Support
        private bool disposedValue = false; // To detect redundant calls

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    // TODO: dispose managed state (managed objects).
                    this.Model.Close();
                    this.Connection.Close();
                }
                this.Model = null;
                this.Connection = null;
                this.Factory = null;
                disposedValue = true;
            }
        }

        // TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
        // ~Publisher() {
        //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
        //   Dispose(false);
        // }

        // This code added to correctly implement the disposable pattern.
        public void Dispose()
        {
            // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
            Dispose(true);
            // TODO: uncomment the following line if the finalizer is overridden above.
            // GC.SuppressFinalize(this);
        }
        #endregion
    }
}

`

@YulerB
Copy link
Contributor Author

YulerB commented Nov 7, 2017

Here is a basic diagram of the difference in network utilization.

network utilization

@kjnilsson
Copy link
Contributor

I've republished the commmits from this PR as well as refactored the approach here

@kjnilsson
Copy link
Contributor

@YulerB what is the MTU of your loopback adapter in that test? I changed my loopback adapter to mimic the MTU size of an ethernet network and the batch approach is still ~3 times more efficient.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 7, 2017

There's a cool movie called Antitrust, in that movie a programmer lets slip a comment about his software, the answers not in the box, its in the band. I've probably seen the movie too many times...

@YulerB
Copy link
Contributor Author

YulerB commented Nov 7, 2017

In that test, I'm not using loopback, I was connecting to server located in NAM from Ireland over a 10Gb network, with MTU of 1500.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 7, 2017

MTU.txt

@kjnilsson
Copy link
Contributor

I haven't tested this but if you wanted to try to get RabbitMQ (or erlang rather) to perform some write batching you could try to set the {delay_send, true} socket option in tcp_listen_options config. This setting affects all AMQP sockets of course but it may be interesting to test.

@YulerB
Copy link
Contributor Author

YulerB commented Nov 14, 2017

Closing this pull request to allow this functionality to be completed using the following pull request #368

@YulerB
Copy link
Contributor Author

YulerB commented Nov 14, 2017

closing

@YulerB YulerB closed this Nov 14, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants