From d4944bc0c68e16da985ce8eebf6d62ef1197a7d2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 17 Dec 2019 02:11:21 -0600 Subject: [PATCH 01/19] adding DotNetty write batching support --- .../Transport/DotNetty/AkkaLoggingHandler.cs | 35 +++++++++++++++++++ .../Transport/DotNetty/TcpTransport.cs | 4 +-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs index 1c36340c5a4..978c006b3e6 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs @@ -16,6 +16,41 @@ namespace Akka.Remote.Transport.DotNetty { + internal class BatchWriter : ChannelHandlerAdapter + { + private readonly int _maxPendingWrites; + private readonly long _maxPendingMillis; + + public BatchWriter(int maxPendingWrites = 30, long maxPendingMillis) + { + _maxPendingWrites = maxPendingWrites; + _maxPendingMillis = maxPendingMillis; + } + + private int _currentPendingWrites = 0; + private long _lastFlush = 0; + + public override Task WriteAsync(IChannelHandlerContext context, object message) + { + var write = base.WriteAsync(context, message); + if (++_currentPendingWrites == _maxPendingWrites || TimeToFlush()) + { + context.Flush(); + } + } + + private bool TimeToFlush() + { + return MonotonicClock.GetMilliseconds() - _lastFlush >= _maxPendingMillis; + } + + private void Reset() + { + _lastFlush = MonotonicClock.GetMilliseconds; + _currentPendingWrites = 0; + } + } + /// /// INTERNAL API /// diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index 2e19ec39ec2..cd66b848834 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -169,10 +169,10 @@ public TcpAssociationHandle(Address localAddress, Address remoteAddress, DotNett public override bool Write(ByteString payload) { - if (_channel.Open && _channel.IsWritable) + if (_channel.Open) { var data = ToByteBuffer(payload); - _channel.WriteAndFlushAsync(data); + _channel.WriteAsync(data); return true; } return false; From 360e81622287be208c3945fa2d5fc5aea22e632c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 17 Dec 2019 11:39:58 -0600 Subject: [PATCH 02/19] DotNetty flush-batching (performance) --- .../Transport/DotNetty/AkkaLoggingHandler.cs | 60 ++++++++++++++++--- .../Transport/DotNetty/DotNettyTransport.cs | 2 + 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs index 978c006b3e6..e1b5ff8c371 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs @@ -8,9 +8,11 @@ using System; using System.Net; using System.Text; +using System.Threading; using System.Threading.Tasks; using Akka.Util; using DotNetty.Buffers; +using DotNetty.Common.Concurrency; using DotNetty.Transport.Channels; using ILoggingAdapter = Akka.Event.ILoggingAdapter; @@ -21,34 +23,76 @@ internal class BatchWriter : ChannelHandlerAdapter private readonly int _maxPendingWrites; private readonly long _maxPendingMillis; - public BatchWriter(int maxPendingWrites = 30, long maxPendingMillis) + public BatchWriter(int maxPendingWrites = 30, long maxPendingMillis = 40l) { _maxPendingWrites = maxPendingWrites; _maxPendingMillis = maxPendingMillis; } private int _currentPendingWrites = 0; - private long _lastFlush = 0; + + public bool HasPendingWrites => _currentPendingWrites > 0; + + public override void HandlerAdded(IChannelHandlerContext context) + { + ScheduleFlush(context); + base.HandlerAdded(context); + } public override Task WriteAsync(IChannelHandlerContext context, object message) { var write = base.WriteAsync(context, message); - if (++_currentPendingWrites == _maxPendingWrites || TimeToFlush()) + if (++_currentPendingWrites == _maxPendingWrites) { context.Flush(); - } + Reset(); + } + + return write; } - private bool TimeToFlush() + void ScheduleFlush(IChannelHandlerContext context) { - return MonotonicClock.GetMilliseconds() - _lastFlush >= _maxPendingMillis; + // Schedule a recurring flush - only fires when there's writable data + var time = TimeSpan.FromMilliseconds(_maxPendingMillis); + var task = new FlushTask(context, time, this); + context.Executor.Schedule(task, time); } - private void Reset() + public void Reset() { - _lastFlush = MonotonicClock.GetMilliseconds; _currentPendingWrites = 0; } + + class FlushTask : IRunnable + { + private readonly IChannelHandlerContext _context; + private readonly TimeSpan _interval; + private readonly BatchWriter _writer; + + public FlushTask(IChannelHandlerContext context, TimeSpan interval, BatchWriter writer) + { + _context = context; + _interval = interval; + _writer = writer; + } + + public void Run() + { + if (_writer.HasPendingWrites) + { + // execute a flush operation + _context.Flush(); + _writer.Reset(); + } + + // channel is still writing + if (_context.Channel.Active) + { + _context.Executor.Schedule(this, _interval); // reschedule + } + } + } } /// diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 7372c9f82b9..5d00ac15bde 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -327,6 +327,8 @@ private void SetInitialChannelPipeline(IChannel channel) pipeline.AddLast("FrameEncoder", new LengthFieldPrepender(Settings.ByteOrder, 4, 0, false)); } } + + pipeline.AddLast("BatchWriter", new BatchWriter()); } private void SetClientPipeline(IChannel channel, Address remoteAddress) From 0b474e64a08bef573d2b9c736997d1389e2c1b75 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2019 09:43:55 -0600 Subject: [PATCH 03/19] stash --- src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs | 2 ++ src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 5d00ac15bde..8a39bf0095e 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -240,6 +240,8 @@ public override async Task Shutdown() var tasks = new List(); foreach (var channel in ConnectionGroup) { + // flush any pending writes first + channel.Flush(); tasks.Add(channel.CloseAsync()); } var all = Task.WhenAll(tasks); diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index cd66b848834..14ca4817dd2 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -147,6 +147,7 @@ public override void ChannelActive(IChannelHandlerContext context) { InitOutbound(context.Channel, (IPEndPoint)context.Channel.RemoteAddress, null); base.ChannelActive(context); + } private void InitOutbound(IChannel channel, IPEndPoint socketAddress, object msg) @@ -182,12 +183,16 @@ private IByteBuffer ToByteBuffer(ByteString payload) { //TODO: optimize DotNetty byte buffer usage // (maybe custom IByteBuffer working directly on ByteString?) + //var buffer = _channel.Allocator.Buffer(payload.Length); + //payload.CopyTo(buffer.Array, buffer.ArrayOffset); + //buffer.SetWriterIndex(payload.Length).MarkWriterIndex(); var buffer = Unpooled.WrappedBuffer(payload.ToByteArray()); return buffer; } public override void Disassociate() { + _channel.Flush(); // flush before we close _channel.CloseAsync(); } } From a91cf3e769ca55c543f3ae02fca28fdd0a550160 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2019 11:51:17 -0600 Subject: [PATCH 04/19] Added batch triggers for byte size too --- .../Transport/DotNetty/AkkaLoggingHandler.cs | 76 ------------- .../Transport/DotNetty/BatchWriter.cs | 103 ++++++++++++++++++ 2 files changed, 103 insertions(+), 76 deletions(-) create mode 100644 src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs diff --git a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs index e1b5ff8c371..28a2a1dc808 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs @@ -18,82 +18,6 @@ namespace Akka.Remote.Transport.DotNetty { - internal class BatchWriter : ChannelHandlerAdapter - { - private readonly int _maxPendingWrites; - private readonly long _maxPendingMillis; - - public BatchWriter(int maxPendingWrites = 30, long maxPendingMillis = 40l) - { - _maxPendingWrites = maxPendingWrites; - _maxPendingMillis = maxPendingMillis; - } - - private int _currentPendingWrites = 0; - - public bool HasPendingWrites => _currentPendingWrites > 0; - - public override void HandlerAdded(IChannelHandlerContext context) - { - ScheduleFlush(context); - base.HandlerAdded(context); - } - - public override Task WriteAsync(IChannelHandlerContext context, object message) - { - var write = base.WriteAsync(context, message); - if (++_currentPendingWrites == _maxPendingWrites) - { - context.Flush(); - Reset(); - } - - return write; - } - - void ScheduleFlush(IChannelHandlerContext context) - { - // Schedule a recurring flush - only fires when there's writable data - var time = TimeSpan.FromMilliseconds(_maxPendingMillis); - var task = new FlushTask(context, time, this); - context.Executor.Schedule(task, time); - } - - public void Reset() - { - _currentPendingWrites = 0; - } - - class FlushTask : IRunnable - { - private readonly IChannelHandlerContext _context; - private readonly TimeSpan _interval; - private readonly BatchWriter _writer; - - public FlushTask(IChannelHandlerContext context, TimeSpan interval, BatchWriter writer) - { - _context = context; - _interval = interval; - _writer = writer; - } - - public void Run() - { - if (_writer.HasPendingWrites) - { - // execute a flush operation - _context.Flush(); - _writer.Reset(); - } - - // channel is still writing - if (_context.Channel.Active) - { - _context.Executor.Schedule(this, _interval); // reschedule - } - } - } - } /// /// INTERNAL API diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs new file mode 100644 index 00000000000..36b1b080c48 --- /dev/null +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -0,0 +1,103 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2019 Lightbend Inc. +// Copyright (C) 2013-2019 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using DotNetty.Buffers; +using DotNetty.Common.Concurrency; +using DotNetty.Transport.Channels; + +namespace Akka.Remote.Transport.DotNetty +{ + /// + /// INTERNAL API. + /// + /// Responsible for batching socket writes together into fewer sys calls to the socket. + /// + internal class BatchWriter : ChannelHandlerAdapter + { + private readonly int _maxPendingWrites; + private readonly int _maxPendingMillis; + private readonly int _maxPendingBytes; + + public BatchWriter(int maxPendingWrites = 20, int maxPendingMillis = 40, int maxPendingBytes = 128000) + { + _maxPendingWrites = maxPendingWrites; + _maxPendingMillis = maxPendingMillis; + _maxPendingBytes = maxPendingBytes; + } + + private int _currentPendingWrites = 0; + private long _currentPendingBytes; + + public bool HasPendingWrites => _currentPendingWrites > 0; + + public override void HandlerAdded(IChannelHandlerContext context) + { + ScheduleFlush(context); + base.HandlerAdded(context); + } + + public override Task WriteAsync(IChannelHandlerContext context, object message) + { + _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; + _currentPendingWrites++; + if (_currentPendingWrites >= _maxPendingWrites + || _currentPendingBytes >= _maxPendingBytes) + { + context.Flush(); + Reset(); + } + + return base.WriteAsync(context, message); + } + + void ScheduleFlush(IChannelHandlerContext context) + { + // Schedule a recurring flush - only fires when there's writable data + var time = TimeSpan.FromMilliseconds(_maxPendingMillis); + var task = new FlushTask(context, time, this); + context.Executor.Schedule(task, time); + } + + public void Reset() + { + _currentPendingWrites = 0; + _currentPendingBytes = 0; + } + + class FlushTask : IRunnable + { + private readonly IChannelHandlerContext _context; + private readonly TimeSpan _interval; + private readonly BatchWriter _writer; + + public FlushTask(IChannelHandlerContext context, TimeSpan interval, BatchWriter writer) + { + _context = context; + _interval = interval; + _writer = writer; + } + + public void Run() + { + if (_writer.HasPendingWrites) + { + // execute a flush operation + _context.Flush(); + _writer.Reset(); + } + + // channel is still writing + if (_context.Channel.Active) + { + _context.Executor.Schedule(this, _interval); // reschedule + } + } + } + } +} From 10f41d73621626caa4f9748cc1fcf9723c429ae9 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2019 11:53:13 -0600 Subject: [PATCH 05/19] use max frame size to determine flush limit --- src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 8a39bf0095e..a115b161fe7 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -330,7 +330,7 @@ private void SetInitialChannelPipeline(IChannel channel) } } - pipeline.AddLast("BatchWriter", new BatchWriter()); + pipeline.AddLast("BatchWriter", new BatchWriter(maxPendingBytes:Settings.MaxFrameSize)); } private void SetClientPipeline(IChannel channel, Address remoteAddress) From 25cff73d2cbe205ccb56450b96aa00f7928aab2e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2019 12:50:15 -0600 Subject: [PATCH 06/19] rewrote ToByteBuffer method as static with inlining --- src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index 14ca4817dd2..268d942c892 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -8,6 +8,7 @@ using System; using System.Net; using System.Net.Sockets; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; @@ -172,20 +173,18 @@ public override bool Write(ByteString payload) { if (_channel.Open) { - var data = ToByteBuffer(payload); + var data = ToByteBuffer(_channel, payload); _channel.WriteAsync(data); return true; } return false; } - private IByteBuffer ToByteBuffer(ByteString payload) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static IByteBuffer ToByteBuffer(IChannel channel, ByteString payload) { //TODO: optimize DotNetty byte buffer usage // (maybe custom IByteBuffer working directly on ByteString?) - //var buffer = _channel.Allocator.Buffer(payload.Length); - //payload.CopyTo(buffer.Array, buffer.ArrayOffset); - //buffer.SetWriterIndex(payload.Length).MarkWriterIndex(); var buffer = Unpooled.WrappedBuffer(payload.ToByteArray()); return buffer; } From 0794b069666558c520308e4adec486254cd45d48 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2019 14:29:50 -0600 Subject: [PATCH 07/19] made sure to push write down the pipeline before flush --- src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 36b1b080c48..bd9c9947055 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -44,6 +44,7 @@ public override void HandlerAdded(IChannelHandlerContext context) public override Task WriteAsync(IChannelHandlerContext context, object message) { + var write = base.WriteAsync(context, message); _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; _currentPendingWrites++; if (_currentPendingWrites >= _maxPendingWrites @@ -53,7 +54,7 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) Reset(); } - return base.WriteAsync(context, message); + return write; } void ScheduleFlush(IChannelHandlerContext context) From eebca2d1688589f16b957ae995c8208dd382bf51 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 14:38:06 -0600 Subject: [PATCH 08/19] changed semantics around how BatchWriter behaves under low traffic --- src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index bd9c9947055..4ae74aca84b 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -93,8 +93,8 @@ public void Run() _writer.Reset(); } - // channel is still writing - if (_context.Channel.Active) + // channel is still open + if (_context.Channel.Open) { _context.Executor.Schedule(this, _interval); // reschedule } From 01754f3585d7a75f0725bce87b27794846ba6878 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 14:42:01 -0600 Subject: [PATCH 09/19] added comment explaining why we call WriteAsync first before flush math --- src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 4ae74aca84b..f7c3df541e3 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -44,6 +44,14 @@ public override void HandlerAdded(IChannelHandlerContext context) public override Task WriteAsync(IChannelHandlerContext context, object message) { + /* + * Need to add the write to the rest of the pipeline first before we + * include it in the formula for determining whether or not we flush + * right now. The reason being is that if we did this the other way around, + * we could flush first before the write was in the "flushable" buffer and + * this can lead to "dangling writes" that never actually get transmitted + * across the network. + */ var write = base.WriteAsync(context, message); _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; _currentPendingWrites++; From 5bbe95b2254fb9895cdc1be8c7b23ec712372786 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 15:17:18 -0600 Subject: [PATCH 10/19] handle termination-flush internally --- src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs | 7 +++++++ .../Akka.Remote/Transport/DotNetty/DotNettyTransport.cs | 2 -- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index f7c3df541e3..01e172534a5 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -65,6 +65,13 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) return write; } + public override Task CloseAsync(IChannelHandlerContext context) + { + // flush any pending writes first + context.Flush(); + return base.CloseAsync(context); + } + void ScheduleFlush(IChannelHandlerContext context) { // Schedule a recurring flush - only fires when there's writable data diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index a115b161fe7..7301b08c79a 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -240,8 +240,6 @@ public override async Task Shutdown() var tasks = new List(); foreach (var channel in ConnectionGroup) { - // flush any pending writes first - channel.Flush(); tasks.Add(channel.CloseAsync()); } var all = Task.WhenAll(tasks); From 25ba123d4bec0625b24fc53d5b38feea5365e162 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 18:23:58 -0600 Subject: [PATCH 11/19] adding some tests to the batch writer --- .../Transport/DotNettyBatchWriterSpecs.cs | 49 +++++++++++++++++++ .../Transport/DotNetty/BatchWriter.cs | 19 +++---- 2 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs new file mode 100644 index 00000000000..9ac83669665 --- /dev/null +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Remote.Transport.DotNetty; +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Embedded; +using FluentAssertions; +using Xunit; + +namespace Akka.Remote.Tests.Transport +{ + public class DotNettyBatchWriterSpecs + { + class FlushHandler : ChannelHandlerAdapter + { + public ChannelOutboundBuffer Buffer { get; private set; } + + public override void Flush(IChannelHandlerContext context) + { + Buffer = context.Channel.Unsafe.OutboundBuffer; + } + } + + /// + /// Stay below the write / count and write / byte threshold. Rely on the timer. + /// + [Fact] + public async Task BatchWriter_should_succeed_with_timer() + { + var writer = new BatchWriter(); + var flushHandler = new FlushHandler(); + var ch = new EmbeddedChannel(writer); + + var ints = Enumerable.Range(0, 4).ToArray(); + foreach(var i in ints) + { + ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + } + + ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(0); + ch.RunPendingTasks(); + await Task.Delay(writer.MaxPendingMillis * 2); + ch.OutboundMessages.Count.Should().Be(ints.Length); + } + } +} diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index 01e172534a5..ecc152106d7 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -20,15 +20,16 @@ namespace Akka.Remote.Transport.DotNetty /// internal class BatchWriter : ChannelHandlerAdapter { - private readonly int _maxPendingWrites; - private readonly int _maxPendingMillis; - private readonly int _maxPendingBytes; + // Made internal for testing purposes + internal readonly int MaxPendingWrites; + internal readonly int MaxPendingMillis; + internal readonly int MaxPendingBytes; public BatchWriter(int maxPendingWrites = 20, int maxPendingMillis = 40, int maxPendingBytes = 128000) { - _maxPendingWrites = maxPendingWrites; - _maxPendingMillis = maxPendingMillis; - _maxPendingBytes = maxPendingBytes; + MaxPendingWrites = maxPendingWrites; + MaxPendingMillis = maxPendingMillis; + MaxPendingBytes = maxPendingBytes; } private int _currentPendingWrites = 0; @@ -55,8 +56,8 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) var write = base.WriteAsync(context, message); _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; _currentPendingWrites++; - if (_currentPendingWrites >= _maxPendingWrites - || _currentPendingBytes >= _maxPendingBytes) + if (_currentPendingWrites >= MaxPendingWrites + || _currentPendingBytes >= MaxPendingBytes) { context.Flush(); Reset(); @@ -75,7 +76,7 @@ public override Task CloseAsync(IChannelHandlerContext context) void ScheduleFlush(IChannelHandlerContext context) { // Schedule a recurring flush - only fires when there's writable data - var time = TimeSpan.FromMilliseconds(_maxPendingMillis); + var time = TimeSpan.FromMilliseconds(MaxPendingMillis); var task = new FlushTask(context, time, this); context.Executor.Schedule(task, time); } From 61d24a952fe368a28e33996c0418ec6fd65071dd Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 20:30:10 -0600 Subject: [PATCH 12/19] fixed batching test --- .../Transport/DotNettyBatchWriterSpecs.cs | 66 +++++++++++++++---- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs index 9ac83669665..710e131bbad 100644 --- a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -1,29 +1,64 @@ -using System; +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2019 Lightbend Inc. +// Copyright (C) 2013-2019 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Akka.Remote.Transport.DotNetty; +using Akka.TestKit; using DotNetty.Buffers; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Embedded; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Akka.Remote.Tests.Transport { - public class DotNettyBatchWriterSpecs + public class DotNettyBatchWriterSpecs : AkkaSpec { - class FlushHandler : ChannelHandlerAdapter + public class FlushLogger : ChannelHandlerAdapter { - public ChannelOutboundBuffer Buffer { get; private set; } + public ITestOutputHelper Helper; + + private TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task Activated => _tcs.Task; + + public FlushLogger(ITestOutputHelper helper) + { + Helper = helper; + } + + public override void ChannelActive(IChannelHandlerContext context) + { + _tcs.TrySetResult(Done.Instance); + base.ChannelActive(context); + } public override void Flush(IChannelHandlerContext context) { - Buffer = context.Channel.Unsafe.OutboundBuffer; + Helper.WriteLine($"[{DateTime.UtcNow}] flushed"); + + base.Flush(context); } } + public FlushLogger Flush { get; } + + public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(output: helper, null) + { + Flush = new FlushLogger(helper); + } + + + /// /// Stay below the write / count and write / byte threshold. Rely on the timer. /// @@ -31,19 +66,28 @@ public override void Flush(IChannelHandlerContext context) public async Task BatchWriter_should_succeed_with_timer() { var writer = new BatchWriter(); - var flushHandler = new FlushHandler(); - var ch = new EmbeddedChannel(writer); - + var ch = new EmbeddedChannel(Flush, writer); + + await Flush.Activated; + var ints = Enumerable.Range(0, 4).ToArray(); foreach(var i in ints) { ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); } - ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(0); + // force write tasks to run ch.RunPendingTasks(); - await Task.Delay(writer.MaxPendingMillis * 2); - ch.OutboundMessages.Count.Should().Be(ints.Length); + + ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); + ch.OutboundMessages.Count.Should().Be(0); + + await AwaitAssertAsync(() => + { + ch.RunPendingTasks(); // force scheduled task to run + ch.OutboundMessages.Count.Should().Be(ints.Length); + }, interval:100.Milliseconds()); + } } } From 95c6207a930060feb0f9f6d33c322e4c0662ecc2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 20:43:17 -0600 Subject: [PATCH 13/19] fixed issues with DotNetty batching spec --- .../Transport/DotNettyBatchWriterSpecs.cs | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs index 710e131bbad..fb27b62350a 100644 --- a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -52,7 +52,7 @@ public override void Flush(IChannelHandlerContext context) public FlushLogger Flush { get; } - public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(output: helper, null) + public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(helper) { Flush = new FlushLogger(helper); } @@ -70,10 +70,53 @@ public async Task BatchWriter_should_succeed_with_timer() await Flush.Activated; - var ints = Enumerable.Range(0, 4).ToArray(); - foreach(var i in ints) + /* + * Run multiple iterations to ensure that the batching mechanism doesn't become stale + */ + foreach (var n in Enumerable.Repeat(0, 3)) { - ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + var ints = Enumerable.Range(0, 4).ToArray(); + foreach (var i in ints) + { + _ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); + } + + // force write tasks to run + ch.RunPendingTasks(); + + ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); + ch.OutboundMessages.Count.Should().Be(0); + + await AwaitAssertAsync(() => + { + ch.RunPendingTasks(); // force scheduled task to run + ch.OutboundMessages.Count.Should().Be(ints.Length); + }, interval: 100.Milliseconds()); + + // reset the outbound queue + ch.OutboundMessages.Clear(); + } + } + + /// + /// Stay below the write / count and write / byte threshold. Rely on the timer. + /// + [Fact] + public async Task BatchWriter_should_flush_messages_during_shutdown() + { + var writer = new BatchWriter(); + var ch = new EmbeddedChannel(Flush, writer); + + await Flush.Activated; + + /* + * Run multiple iterations to ensure that the batching mechanism doesn't become stale + */ + + var ints = Enumerable.Range(0, 10).ToArray(); + foreach (var i in ints) + { + _ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i)); } // force write tasks to run @@ -82,11 +125,15 @@ public async Task BatchWriter_should_succeed_with_timer() ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4); ch.OutboundMessages.Count.Should().Be(0); + // close channels + _ = ch.CloseAsync(); + await AwaitAssertAsync(() => { ch.RunPendingTasks(); // force scheduled task to run ch.OutboundMessages.Count.Should().Be(ints.Length); - }, interval:100.Milliseconds()); + }, interval: 100.Milliseconds()); + } } From b1f0070c2fad2b2f8937daed2d8f0037ebdf25b9 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 20:45:45 -0600 Subject: [PATCH 14/19] don't use built-in mechanisms to determine scheduling --- src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index ecc152106d7..cfee191627b 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -25,6 +25,8 @@ internal class BatchWriter : ChannelHandlerAdapter internal readonly int MaxPendingMillis; internal readonly int MaxPendingBytes; + internal bool CanSchedule { get; private set; } = true; + public BatchWriter(int maxPendingWrites = 20, int maxPendingMillis = 40, int maxPendingBytes = 128000) { MaxPendingWrites = maxPendingWrites; @@ -70,6 +72,7 @@ public override Task CloseAsync(IChannelHandlerContext context) { // flush any pending writes first context.Flush(); + CanSchedule = false; return base.CloseAsync(context); } @@ -109,11 +112,8 @@ public void Run() _writer.Reset(); } - // channel is still open - if (_context.Channel.Open) - { + if(_writer.CanSchedule) _context.Executor.Schedule(this, _interval); // reschedule - } } } } From 6fe16f17ad94703c0a3d18f9f90db2a522929e69 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 20 Jan 2020 22:56:46 -0600 Subject: [PATCH 15/19] debugging RemoteDeliverySpec --- src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs b/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs index 42fea6f326a..3edf6089cd9 100644 --- a/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs +++ b/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs @@ -21,7 +21,7 @@ public RemoteDeliveryMultiNetSpec() Second = Role("second"); Third = Role("third"); - CommonConfig = DebugConfig(false); + CommonConfig = DebugConfig(true); } public RoleName First { get; } From 82e50a6e605aae6fa1c160ad7d52da46eb85bb05 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 21 Jan 2020 12:17:46 -0600 Subject: [PATCH 16/19] added BatchWriterSettings class --- .../Transport/DotNetty/BatchWriter.cs | 101 ++++++++++++++---- 1 file changed, 83 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index cfee191627b..abcc1e507db 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -7,12 +7,74 @@ using System; using System.Threading.Tasks; +using Akka.Configuration; using DotNetty.Buffers; using DotNetty.Common.Concurrency; using DotNetty.Transport.Channels; namespace Akka.Remote.Transport.DotNetty { + /// + /// INTERNAL API. + /// + /// Configuration object for + /// + internal class BatchWriterSettings + { + public const int DefaultMaxPendingWrites = 30; + public const int DefaultMaxPendingBytes = 16000; + public static readonly TimeSpan DefaultFlushInterval = TimeSpan.FromMilliseconds(40); + + public BatchWriterSettings(Config hocon) + { + EnableBatching = hocon.GetBoolean("enabled", true); + MaxPendingWrites = hocon.GetInt("max-pending-writes", DefaultMaxPendingWrites); + MaxPendingBytes = hocon.GetInt("max-pending-bytes", DefaultMaxPendingBytes); + FlushInterval = hocon.GetTimeSpan("flush-interval", DefaultFlushInterval, false); + } + + public BatchWriterSettings(TimeSpan? maxDuration = null, bool enableBatching = true, + int maxPendingWrites = DefaultMaxPendingWrites, int maxPendingBytes = DefaultMaxPendingBytes) + { + EnableBatching = enableBatching; + MaxPendingWrites = maxPendingWrites; + FlushInterval = maxDuration ?? DefaultFlushInterval; + MaxPendingBytes = maxPendingBytes; + } + + /// + /// Toggle for turning this feature on or off. + /// + /// + /// Defaults to true. + /// + public bool EnableBatching { get; } + + /// + /// The maximum amount of buffered writes that can be buffered before flushing I/O. + /// + /// + /// Defaults to 30. + /// + public int MaxPendingWrites { get; } + + /// + /// In the event of low-traffic channels, the maximum amount of time we'll wait before flushing writes. + /// + /// + /// Defaults to 40 milliseconds. + /// + public TimeSpan FlushInterval { get; } + + /// + /// The maximum number of outstanding bytes that can be written prior to a flush. + /// + /// + /// Defaults to 16kb. + /// + public int MaxPendingBytes { get; } + } + /// /// INTERNAL API. /// @@ -20,18 +82,13 @@ namespace Akka.Remote.Transport.DotNetty /// internal class BatchWriter : ChannelHandlerAdapter { - // Made internal for testing purposes - internal readonly int MaxPendingWrites; - internal readonly int MaxPendingMillis; - internal readonly int MaxPendingBytes; + public readonly BatchWriterSettings Settings; internal bool CanSchedule { get; private set; } = true; - public BatchWriter(int maxPendingWrites = 20, int maxPendingMillis = 40, int maxPendingBytes = 128000) + public BatchWriter(BatchWriterSettings settings) { - MaxPendingWrites = maxPendingWrites; - MaxPendingMillis = maxPendingMillis; - MaxPendingBytes = maxPendingBytes; + Settings = settings; } private int _currentPendingWrites = 0; @@ -41,7 +98,8 @@ public BatchWriter(int maxPendingWrites = 20, int maxPendingMillis = 40, int max public override void HandlerAdded(IChannelHandlerContext context) { - ScheduleFlush(context); + if(Settings.EnableBatching) + ScheduleFlush(context); // only schedule flush operations when batching is enabled base.HandlerAdded(context); } @@ -56,14 +114,22 @@ public override Task WriteAsync(IChannelHandlerContext context, object message) * across the network. */ var write = base.WriteAsync(context, message); - _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; - _currentPendingWrites++; - if (_currentPendingWrites >= MaxPendingWrites - || _currentPendingBytes >= MaxPendingBytes) + if (Settings.EnableBatching) + { + _currentPendingBytes += ((IByteBuffer)message).ReadableBytes; + _currentPendingWrites++; + if (_currentPendingWrites >= Settings.MaxPendingWrites + || _currentPendingBytes >= Settings.MaxPendingBytes) + { + context.Flush(); + Reset(); + } + } + else { context.Flush(); - Reset(); } + return write; } @@ -76,12 +142,11 @@ public override Task CloseAsync(IChannelHandlerContext context) return base.CloseAsync(context); } - void ScheduleFlush(IChannelHandlerContext context) + private void ScheduleFlush(IChannelHandlerContext context) { // Schedule a recurring flush - only fires when there's writable data - var time = TimeSpan.FromMilliseconds(MaxPendingMillis); - var task = new FlushTask(context, time, this); - context.Executor.Schedule(task, time); + var task = new FlushTask(context, Settings.FlushInterval, this); + context.Executor.Schedule(task, Settings.FlushInterval); } public void Reset() From 18dcce672b6d9b56c7ba6b375be2e01373b3117f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 21 Jan 2020 13:40:51 -0600 Subject: [PATCH 17/19] added HOCON configuration for tuning the batching system --- .../Akka.Remote.Tests/RemoteConfigSpec.cs | 14 ++++++- .../Transport/DotNettyBatchWriterSpecs.cs | 4 +- .../Akka.Remote/Configuration/Remote.conf | 38 ++++++++++++++++++- .../Transport/DotNetty/BatchWriter.cs | 8 ++-- .../Transport/DotNetty/DotNettyTransport.cs | 2 +- .../DotNetty/DotNettyTransportSettings.cs | 14 ++++++- 6 files changed, 68 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs index 73d2129203b..4f22f42bc76 100644 --- a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs +++ b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs @@ -165,7 +165,19 @@ public void Remoting_should_contain_correct_hostname_values_in_ReferenceConf() //Non-specified hostnames should default to IPAddress.Any Assert.Equal(IPAddress.Any.ToString(), s.Hostname); Assert.Equal(IPAddress.Any.ToString(), s.PublicHostname); - } + } + + [Fact] + public void Remoting_should_contain_correct_BatchWriter_settings_in_ReferenceConf() + { + var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp"); + var s = DotNettyTransportSettings.Create(c); + + s.BatchWriterSettings.EnableBatching.Should().BeTrue(); + s.BatchWriterSettings.FlushInterval.Should().Be(BatchWriterSettings.DefaultFlushInterval); + s.BatchWriterSettings.MaxPendingBytes.Should().Be(BatchWriterSettings.DefaultMaxPendingBytes); + s.BatchWriterSettings.MaxPendingWrites.Should().Be(BatchWriterSettings.DefaultMaxPendingWrites); + } } } diff --git a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs index fb27b62350a..abf82afa4df 100644 --- a/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs +++ b/src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs @@ -65,7 +65,7 @@ public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(helper) [Fact] public async Task BatchWriter_should_succeed_with_timer() { - var writer = new BatchWriter(); + var writer = new BatchWriter(new BatchWriterSettings()); var ch = new EmbeddedChannel(Flush, writer); await Flush.Activated; @@ -104,7 +104,7 @@ await AwaitAssertAsync(() => [Fact] public async Task BatchWriter_should_flush_messages_during_shutdown() { - var writer = new BatchWriter(); + var writer = new BatchWriter(new BatchWriterSettings()); var ch = new EmbeddedChannel(Flush, writer); await Flush.Activated; diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 08c1fc87bc0..d41c784aa8f 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -437,6 +437,42 @@ akka { # for the following bug: https://github.com/akkadotnet/akka.net/issues/3370 enable-pooling = true + # PERFORMANCE TUNING + # + # The batching feature of DotNetty is designed to help batch together logical writes into a smaller + # number of physical writes across the socket. This helps significantly reduce the number of system calls + # and can improve performance by as much as 150% on some systems when enabled. + batching{ + + # Enables the batching system. When disabled, every write will be flushed immediately. + # Disable this setting if you're working with a VERY low-traffic system that requires + # fast (< 40ms) acknowledgement for all periodic messages. + enabled = true + + # The max write threshold based on the number of logical messages regardless of their size. + # This is a safe default value - decrease it if you have a small number of remote actors + # who engage in frequent request->response communication which requires low latency (< 40ms). + max-pending-writes = 30 + + # The max write threshold based on the byte size of all buffered messages. If there are 4 messages + # waiting to be written (with batching.max-pending-writes = 30) but their total size is greater than + # batching.max-pending-bytes, a flush will be triggered immediately. + # + # Increase this value is you have larger message sizes and watch to take advantage of batching, but + # otherwise leave it as-is. + # + # NOTE: this value should always be smaller than dot-netty.tcp.maximum-frame-size. + max-pending-bytes = 16k + + # In the event that neither the batching.max-pending-writes or batching.max-pending-bytes + # is hit we guarantee that all pending writes will be flushed within this interval. + # + # This setting, realistically, can't be enforced any lower than the OS' clock resolution (~20ms). + # If you have a very low-traffic system, either disable pooling altogether or lower the batching.max-pending-writes + # threshold to maximize throughput. Otherwise, leave this setting as-is. + flush-interval = 40ms + } + # If set to "" then the specified dispatcher # will be used to accept inbound connections, and perform IO. If "" then # dedicated threads will be used. @@ -585,8 +621,6 @@ akka { thread-count = 4 } } - - } } \ No newline at end of file diff --git a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs index abcc1e507db..de39d7575be 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/BatchWriter.cs @@ -22,19 +22,19 @@ namespace Akka.Remote.Transport.DotNetty internal class BatchWriterSettings { public const int DefaultMaxPendingWrites = 30; - public const int DefaultMaxPendingBytes = 16000; + public const long DefaultMaxPendingBytes = 16 * 1024L; public static readonly TimeSpan DefaultFlushInterval = TimeSpan.FromMilliseconds(40); public BatchWriterSettings(Config hocon) { EnableBatching = hocon.GetBoolean("enabled", true); MaxPendingWrites = hocon.GetInt("max-pending-writes", DefaultMaxPendingWrites); - MaxPendingBytes = hocon.GetInt("max-pending-bytes", DefaultMaxPendingBytes); + MaxPendingBytes = hocon.GetByteSize("max-pending-bytes") ?? DefaultMaxPendingBytes; FlushInterval = hocon.GetTimeSpan("flush-interval", DefaultFlushInterval, false); } public BatchWriterSettings(TimeSpan? maxDuration = null, bool enableBatching = true, - int maxPendingWrites = DefaultMaxPendingWrites, int maxPendingBytes = DefaultMaxPendingBytes) + int maxPendingWrites = DefaultMaxPendingWrites, long maxPendingBytes = DefaultMaxPendingBytes) { EnableBatching = enableBatching; MaxPendingWrites = maxPendingWrites; @@ -72,7 +72,7 @@ public BatchWriterSettings(TimeSpan? maxDuration = null, bool enableBatching = t /// /// Defaults to 16kb. /// - public int MaxPendingBytes { get; } + public long MaxPendingBytes { get; } } /// diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 7301b08c79a..640faf07fdb 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -328,7 +328,7 @@ private void SetInitialChannelPipeline(IChannel channel) } } - pipeline.AddLast("BatchWriter", new BatchWriter(maxPendingBytes:Settings.MaxFrameSize)); + pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings)); } private void SetClientPipeline(IChannel channel, Address remoteAddress) diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs index b70604cbbce..ba295a7329c 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs @@ -69,6 +69,8 @@ public static DotNettyTransportSettings Create(Config config) default: throw new ArgumentException($"Unknown byte-order option [{byteOrderString}]. Supported options are: big-endian, little-endian."); } + var batchWriterSettings = new BatchWriterSettings(config.GetConfig("batching")); + return new DotNettyTransportSettings( transportMode: transportMode == "tcp" ? TransportMode.Tcp : TransportMode.Udp, enableSsl: config.GetBoolean("enable-ssl", false), @@ -94,7 +96,8 @@ public static DotNettyTransportSettings Create(Config config) backwardsCompatibilityModeEnabled: config.GetBoolean("enable-backwards-compatibility", false), logTransport: config.HasPath("log-transport") && config.GetBoolean("log-transport"), byteOrder: order, - enableBufferPooling: config.GetBoolean("enable-pooling", true)); + enableBufferPooling: config.GetBoolean("enable-pooling", true), + batchWriterSettings); } private static int? ToNullableInt(long? value) => value.HasValue && value.Value > 0 ? (int?)value.Value : null; @@ -230,10 +233,16 @@ private static int ComputeWorkerPoolSize(Config config) /// public readonly bool EnableBufferPooling; + /// + /// Used for performance-tuning the DotNetty channels to maximize I/O performance. + /// + public readonly BatchWriterSettings BatchWriterSettings; + public DotNettyTransportSettings(TransportMode transportMode, bool enableSsl, TimeSpan connectTimeout, string hostname, string publicHostname, int port, int? publicPort, int serverSocketWorkerPoolSize, int clientSocketWorkerPoolSize, int maxFrameSize, SslSettings ssl, bool dnsUseIpv6, bool tcpReuseAddr, bool tcpKeepAlive, bool tcpNoDelay, int backlog, bool enforceIpFamily, - int? receiveBufferSize, int? sendBufferSize, int? writeBufferHighWaterMark, int? writeBufferLowWaterMark, bool backwardsCompatibilityModeEnabled, bool logTransport, ByteOrder byteOrder, bool enableBufferPooling) + int? receiveBufferSize, int? sendBufferSize, int? writeBufferHighWaterMark, int? writeBufferLowWaterMark, bool backwardsCompatibilityModeEnabled, bool logTransport, ByteOrder byteOrder, + bool enableBufferPooling, BatchWriterSettings batchWriterSettings) { if (maxFrameSize < 32000) throw new ArgumentException("maximum-frame-size must be at least 32000 bytes", nameof(maxFrameSize)); @@ -262,6 +271,7 @@ public DotNettyTransportSettings(TransportMode transportMode, bool enableSsl, Ti LogTransport = logTransport; ByteOrder = byteOrder; EnableBufferPooling = enableBufferPooling; + BatchWriterSettings = batchWriterSettings; } } internal enum TransportMode From 038434693ef4a970d661e728401f1a64b85e538e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 21 Jan 2020 13:42:50 -0600 Subject: [PATCH 18/19] disable batching inside problematic Akka.Remote tests --- src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs | 1 + src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs index 44aa1b2343a..e53a52f9310 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs @@ -32,6 +32,7 @@ public NodeChurnConfig() .WithFallback(ConfigurationFactory.ParseString(@" akka.cluster.auto-down-unreachable-after = 1s akka.remote.log-frame-size-exceeding = 2000b + akka.remote.dot-netty.tcp.batching.enabled = false # disable batching ")) .WithFallback(MultiNodeClusterSpec.ClusterConfig()); } diff --git a/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs b/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs index 3edf6089cd9..dec3149ac93 100644 --- a/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs +++ b/src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using Akka.Actor; +using Akka.Configuration; using Akka.Remote.TestKit; namespace Akka.Remote.Tests.MultiNode @@ -21,7 +22,10 @@ public RemoteDeliveryMultiNetSpec() Second = Role("second"); Third = Role("third"); - CommonConfig = DebugConfig(true); + CommonConfig = DebugConfig(true) + .WithFallback(ConfigurationFactory.ParseString(@" + akka.remote.dot-netty.tcp.batching.enabled = false # disable batching + ")); } public RoleName First { get; } From 0944a1532aea6f1c0c62035b876cb89b537be038 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 21 Jan 2020 13:59:20 -0600 Subject: [PATCH 19/19] fix C#7 compilation issue --- .../Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs index ba295a7329c..f33f1afa8ff 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransportSettings.cs @@ -97,7 +97,7 @@ public static DotNettyTransportSettings Create(Config config) logTransport: config.HasPath("log-transport") && config.GetBoolean("log-transport"), byteOrder: order, enableBufferPooling: config.GetBoolean("enable-pooling", true), - batchWriterSettings); + batchWriterSettings: batchWriterSettings); } private static int? ToNullableInt(long? value) => value.HasValue && value.Value > 0 ? (int?)value.Value : null;