diff --git a/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs index 44aa1b2343a..e53a52f9310 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs @@ -32,6 +32,7 @@ public NodeChurnConfig() .WithFallback(ConfigurationFactory.ParseString(@" akka.cluster.auto-down-unreachable-after = 1s akka.remote.log-frame-size-exceeding = 2000b + akka.remote.dot-netty.tcp.batching.enabled = false # disable batching ")) .WithFallback(MultiNodeClusterSpec.ClusterConfig()); } diff --git a/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs b/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs index 42fea6f326a..dec3149ac93 100644 --- a/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs +++ b/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using Akka.Actor; +using Akka.Configuration; using Akka.Remote.TestKit; namespace Akka.Remote.Tests.MultiNode @@ -21,7 +22,10 @@ public RemoteDeliveryMultiNetSpec() Second = Role("second"); Third = Role("third"); - CommonConfig = DebugConfig(false); + CommonConfig = DebugConfig(true) + .WithFallback(ConfigurationFactory.ParseString(@" + akka.remote.dot-netty.tcp.batching.enabled = false # disable batching + ")); } public RoleName First { get; } diff --git a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs index 73d2129203b..4f22f42bc76 100644 --- a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs +++ b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs @@ -165,7 +165,19 @@ public void Remoting_should_contain_correct_hostname_values_in_ReferenceConf() //Non-specified hostnames should default to IPAddress.Any Assert.Equal(IPAddress.Any.ToString(), s.Hostname); Assert.Equal(IPAddress.Any.ToString(), s.PublicHostname); - } + } + + [Fact] + public void Remoting_should_contain_correct_BatchWriter_settings_in_ReferenceConf() + { + var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp"); + var s = DotNettyTransportSettings.Create(c); + + s.BatchWriterSettings.EnableBatching.Should().BeTrue(); + s.BatchWriterSettings.FlushInterval.Should().Be(BatchWriterSettings.DefaultFlushInterval); + s.BatchWriterSettings.MaxPendingBytes.Should().Be(BatchWriterSettings.DefaultMaxPendingBytes); + s.BatchWriterSettings.MaxPendingWrites.Should().Be(BatchWriterSettings.DefaultMaxPendingWrites); + } } } diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs new file mode 100644 index 00000000000..abf82afa4df --- /dev/null +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -0,0 +1,140 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2019 Lightbend Inc. +// Copyright (C) 2013-2019 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Remote.Transport.DotNetty; +using Akka.TestKit; +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Embedded; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Remote.Tests.Transport +{ + public class DotNettyBatchWriterSpecs : AkkaSpec + { + public class FlushLogger : ChannelHandlerAdapter + { + public ITestOutputHelper Helper; + + private TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task Activated => _tcs.Task; + + public FlushLogger(ITestOutputHelper helper) + { + Helper = helper; + } + + public override void ChannelActive(IChannelHandlerContext context) + { + _tcs.TrySetResult(Done.Instance); + base.ChannelActive(context); + } + + public override void Flush(IChannelHandlerContext context) + { + Helper.WriteLine($"[{DateTime.UtcNow}] flushed"); + + base.Flush(context); + } + } + + public FlushLogger Flush { get; } + + public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(helper) + { + Flush = new FlushLogger(helper); + } + + + + /// + /// Stay below the write / count and write / byte threshold. Rely on the timer. + /// + [Fact] + public async Task BatchWriter_should_succeed_with_timer() + { + var writer = new BatchWriter(new BatchWriterSettings()); + var ch = new EmbeddedChannel(Flush, writer); + + await Flush.Activated; + + /* + * Run multiple iterations to ensure that the batching mechanism doesn't become stale + */ + foreach (var n in Enumerable.Repeat(0, 3)) + { + var ints = Enumerable.Range(0, 4).ToArray(); + foreach (var i in ints) + { + _ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + } + + // force write tasks to run + ch.RunPendingTasks(); + + ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); + ch.OutboundMessages.Count.Should().Be(0); + + await AwaitAssertAsync(() => + { + ch.RunPendingTasks(); // force scheduled task to run + ch.OutboundMessages.Count.Should().Be(ints.Length); + }, interval: 100.Milliseconds()); + + // reset the outbound queue + ch.OutboundMessages.Clear(); + } + } + + /// + /// Stay below the write / count and write / byte threshold. Rely on the timer. + /// + [Fact] + public async Task BatchWriter_should_flush_messages_during_shutdown() + { + var writer = new BatchWriter(new BatchWriterSettings()); + var ch = new EmbeddedChannel(Flush, writer); + + await Flush.Activated; + + /* + * Run multiple iterations to ensure that the batching mechanism doesn't become stale + */ + + var ints = Enumerable.Range(0, 10).ToArray(); + foreach (var i in ints) + { + _ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + } + + // force write tasks to run + ch.RunPendingTasks(); + + ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); + ch.OutboundMessages.Count.Should().Be(0); + + // close channels + _ = ch.CloseAsync(); + + await AwaitAssertAsync(() => + { + ch.RunPendingTasks(); // force scheduled task to run + ch.OutboundMessages.Count.Should().Be(ints.Length); + }, interval: 100.Milliseconds()); + + + } + } +} diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 08c1fc87bc0..d41c784aa8f 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -437,6 +437,42 @@ akka { # for the following bug: https://github.com/akkadotnet/akka.net/issues/3370 enable-pooling = true + # PERFORMANCE TUNING + # + # The batching feature of DotNetty is designed to help batch together logical writes into a smaller + # number of physical writes across the socket. This helps significantly reduce the number of system calls + # and can improve performance by as much as 150% on some systems when enabled. + batching{ + + # Enables the batching system. When disabled, every write will be flushed immediately. + # Disable this setting if you're working with a VERY low-traffic system that requires + # fast (< 40ms) acknowledgement for all periodic messages. + enabled = true + + # The max write threshold based on the number of logical messages regardless of their size. + # This is a safe default value - decrease it if you have a small number of remote actors + # who engage in frequent request->response communication which requires low latency (< 40ms). + max-pending-writes = 30 + + # The max write threshold based on the byte size of all buffered messages. If there are 4 messages + # waiting to be written (with batching.max-pending-writes = 30) but their total size is greater than + # batching.max-pending-bytes, a flush will be triggered immediately. + # + # Increase this value is you have larger message sizes and watch to take advantage of batching, but + # otherwise leave it as-is. + # + # NOTE: this value should always be smaller than dot-netty.tcp.maximum-frame-size. + max-pending-bytes = 16k + + # In the event that neither the batching.max-pending-writes or batching.max-pending-bytes + # is hit we guarantee that all pending writes will be flushed within this interval. + # + # This setting, realistically, can't be enforced any lower than the OS' clock resolution (~20ms). + # If you have a very low-traffic system, either disable pooling altogether or lower the batching.max-pending-writes + # threshold to maximize throughput. Otherwise, leave this setting as-is. + flush-interval = 40ms + } + # If set to "" then the specified dispatcher # will be used to accept inbound connections, and perform IO. If "" then # dedicated threads will be used. @@ -585,8 +621,6 @@ akka { thread-count = 4 } } - - } } \ No newline at end of file diff --git a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs index 1c36340c5a4..28a2a1dc808 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs @@ -8,14 +8,17 @@ using System; using System.Net; using System.Text; +using System.Threading; using System.Threading.Tasks; using Akka.Util; using DotNetty.Buffers; +using DotNetty.Common.Concurrency; using DotNetty.Transport.Channels; using ILoggingAdapter = Akka.Event.ILoggingAdapter; namespace Akka.Remote.Transport.DotNetty { + /// /// INTERNAL API /// diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs new file mode 100644 index 00000000000..de39d7575be --- /dev/null +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -0,0 +1,185 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2019 Lightbend Inc. +// Copyright (C) 2013-2019 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Configuration; +using DotNetty.Buffers; +using DotNetty.Common.Concurrency; +using DotNetty.Transport.Channels; + +namespace Akka.Remote.Transport.DotNetty +{ + /// + /// INTERNAL API. + /// + /// Configuration object for + /// + internal class BatchWriterSettings + { + public const int DefaultMaxPendingWrites = 30; + public const long DefaultMaxPendingBytes = 16 * 1024L; + public static readonly TimeSpan DefaultFlushInterval = TimeSpan.FromMilliseconds(40); + + public BatchWriterSettings(Config hocon) + { + EnableBatching = hocon.GetBoolean("enabled", true); + MaxPendingWrites = hocon.GetInt("max-pending-writes", DefaultMaxPendingWrites); + MaxPendingBytes = hocon.GetByteSize("max-pending-bytes") ?? DefaultMaxPendingBytes; + FlushInterval = hocon.GetTimeSpan("flush-interval", DefaultFlushInterval, false); + } + + public BatchWriterSettings(TimeSpan? maxDuration = null, bool enableBatching = true, + int maxPendingWrites = DefaultMaxPendingWrites, long maxPendingBytes = DefaultMaxPendingBytes) + { + EnableBatching = enableBatching; + MaxPendingWrites = maxPendingWrites; + FlushInterval = maxDuration ?? DefaultFlushInterval; + MaxPendingBytes = maxPendingBytes; + } + + /// + /// Toggle for turning this feature on or off. + /// + /// + /// Defaults to true. + /// + public bool EnableBatching { get; } + + /// + /// The maximum amount of buffered writes that can be buffered before flushing I/O. + /// + /// + /// Defaults to 30. + /// + public int MaxPendingWrites { get; } + + /// + /// In the event of low-traffic channels, the maximum amount of time we'll wait before flushing writes. + /// + /// + /// Defaults to 40 milliseconds. + /// + public TimeSpan FlushInterval { get; } + + /// + /// The maximum number of outstanding bytes that can be written prior to a flush. + /// + /// + /// Defaults to 16kb. + /// + public long MaxPendingBytes { get; } + } + + /// + /// INTERNAL API. + /// + /// Responsible for batching socket writes together into fewer sys calls to the socket. + /// + internal class BatchWriter : ChannelHandlerAdapter + { + public readonly BatchWriterSettings Settings; + + internal bool CanSchedule { get; private set; } = true; + + public BatchWriter(BatchWriterSettings settings) + { + Settings = settings; + } + + private int _currentPendingWrites = 0; + private long _currentPendingBytes; + + public bool HasPendingWrites => _currentPendingWrites > 0; + + public override void HandlerAdded(IChannelHandlerContext context) + { + if(Settings.EnableBatching) + ScheduleFlush(context); // only schedule flush operations when batching is enabled + base.HandlerAdded(context); + } + + public override Task WriteAsync(IChannelHandlerContext context, object message) + { + /* + * Need to add the write to the rest of the pipeline first before we + * include it in the formula for determining whether or not we flush + * right now. The reason being is that if we did this the other way around, + * we could flush first before the write was in the "flushable" buffer and + * this can lead to "dangling writes" that never actually get transmitted + * across the network. + */ + var write = base.WriteAsync(context, message); + if (Settings.EnableBatching) + { + _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; + _currentPendingWrites++; + if (_currentPendingWrites >= Settings.MaxPendingWrites + || _currentPendingBytes >= Settings.MaxPendingBytes) + { + context.Flush(); + Reset(); + } + } + else + { + context.Flush(); + } + + + return write; + } + + public override Task CloseAsync(IChannelHandlerContext context) + { + // flush any pending writes first + context.Flush(); + CanSchedule = false; + return base.CloseAsync(context); + } + + private void ScheduleFlush(IChannelHandlerContext context) + { + // Schedule a recurring flush - only fires when there's writable data + var task = new FlushTask(context, Settings.FlushInterval, this); + context.Executor.Schedule(task, Settings.FlushInterval); + } + + public void Reset() + { + _currentPendingWrites = 0; + _currentPendingBytes = 0; + } + + class FlushTask : IRunnable + { + private readonly IChannelHandlerContext _context; + private readonly TimeSpan _interval; + private readonly BatchWriter _writer; + + public FlushTask(IChannelHandlerContext context, TimeSpan interval, BatchWriter writer) + { + _context = context; + _interval = interval; + _writer = writer; + } + + public void Run() + { + if (_writer.HasPendingWrites) + { + // execute a flush operation + _context.Flush(); + _writer.Reset(); + } + + if(_writer.CanSchedule) + _context.Executor.Schedule(this, _interval); // reschedule + } + } + } +} diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 7372c9f82b9..640faf07fdb 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -327,6 +327,8 @@ private void SetInitialChannelPipeline(IChannel channel) pipeline.AddLast("FrameEncoder", new LengthFieldPrepender(Settings.ByteOrder, 4, 0, false)); } } + + pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings)); } private void SetClientPipeline(IChannel channel, Address remoteAddress) diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs index b70604cbbce..f33f1afa8ff 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs @@ -69,6 +69,8 @@ public static DotNettyTransportSettings Create(Config config) default: throw new ArgumentException($"Unknown byte-order option [{byteOrderString}]. Supported options are: big-endian, little-endian."); } + var batchWriterSettings = new BatchWriterSettings(config.GetConfig("batching")); + return new DotNettyTransportSettings( transportMode: transportMode == "tcp" ? TransportMode.Tcp : TransportMode.Udp, enableSsl: config.GetBoolean("enable-ssl", false), @@ -94,7 +96,8 @@ public static DotNettyTransportSettings Create(Config config) backwardsCompatibilityModeEnabled: config.GetBoolean("enable-backwards-compatibility", false), logTransport: config.HasPath("log-transport") && config.GetBoolean("log-transport"), byteOrder: order, - enableBufferPooling: config.GetBoolean("enable-pooling", true)); + enableBufferPooling: config.GetBoolean("enable-pooling", true), + batchWriterSettings: batchWriterSettings); } private static int? ToNullableInt(long? value) => value.HasValue && value.Value > 0 ? (int?)value.Value : null; @@ -230,10 +233,16 @@ private static int ComputeWorkerPoolSize(Config config) /// public readonly bool EnableBufferPooling; + /// + /// Used for performance-tuning the DotNetty channels to maximize I/O performance. + /// + public readonly BatchWriterSettings BatchWriterSettings; + public DotNettyTransportSettings(TransportMode transportMode, bool enableSsl, TimeSpan connectTimeout, string hostname, string publicHostname, int port, int? publicPort, int serverSocketWorkerPoolSize, int clientSocketWorkerPoolSize, int maxFrameSize, SslSettings ssl, bool dnsUseIpv6, bool tcpReuseAddr, bool tcpKeepAlive, bool tcpNoDelay, int backlog, bool enforceIpFamily, - int? receiveBufferSize, int? sendBufferSize, int? writeBufferHighWaterMark, int? writeBufferLowWaterMark, bool backwardsCompatibilityModeEnabled, bool logTransport, ByteOrder byteOrder, bool enableBufferPooling) + int? receiveBufferSize, int? sendBufferSize, int? writeBufferHighWaterMark, int? writeBufferLowWaterMark, bool backwardsCompatibilityModeEnabled, bool logTransport, ByteOrder byteOrder, + bool enableBufferPooling, BatchWriterSettings batchWriterSettings) { if (maxFrameSize < 32000) throw new ArgumentException("maximum-frame-size must be at least 32000 bytes", nameof(maxFrameSize)); @@ -262,6 +271,7 @@ public DotNettyTransportSettings(TransportMode transportMode, bool enableSsl, Ti LogTransport = logTransport; ByteOrder = byteOrder; EnableBufferPooling = enableBufferPooling; + BatchWriterSettings = batchWriterSettings; } } internal enum TransportMode diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index 7af12de9e47..ae7671bdf40 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -8,6 +8,7 @@ using System; using System.Net; using System.Net.Sockets; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; @@ -147,6 +148,7 @@ public override void ChannelActive(IChannelHandlerContext context) { InitOutbound(context.Channel, (IPEndPoint)context.Channel.RemoteAddress, null); base.ChannelActive(context); + } private void InitOutbound(IChannel channel, IPEndPoint socketAddress, object msg) @@ -169,16 +171,17 @@ public TcpAssociationHandle(Address localAddress, Address remoteAddress, DotNett public override bool Write(ByteString payload) { - if (_channel.Open && _channel.IsWritable) + if (_channel.Open) { - var data = ToByteBuffer(payload); - _channel.WriteAndFlushAsync(data); + var data = ToByteBuffer(_channel, payload); + _channel.WriteAsync(data); return true; } return false; } - private IByteBuffer ToByteBuffer(ByteString payload) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static IByteBuffer ToByteBuffer(IChannel channel, ByteString payload) { //TODO: optimize DotNetty byte buffer usage // (maybe custom IByteBuffer working directly on ByteString?) @@ -188,6 +191,7 @@ private IByteBuffer ToByteBuffer(ByteString payload) public override void Disassociate() { + _channel.Flush(); // flush before we close _channel.CloseAsync(); } }