Skip to content

Commit

Permalink
Akka.Remote: improved write performance with DotNetty flush-batching (#…
Browse files Browse the repository at this point in the history
…4106)

* adding DotNetty write batching support

* added HOCON configuration for tuning the batching system
  • Loading branch information
Aaronontheweb committed Jan 21, 2020
1 parent 1191a20 commit cd11064
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Cluster.Tests.MultiNode/NodeChurnSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public NodeChurnConfig()
.WithFallback(ConfigurationFactory.ParseString(@"
akka.cluster.auto-down-unreachable-after = 1s
akka.remote.log-frame-size-exceeding = 2000b
akka.remote.dot-netty.tcp.batching.enabled = false # disable batching
"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());
}
Expand Down
6 changes: 5 additions & 1 deletion src/core/Akka.Remote.Tests.MultiNode/RemoteDeliverySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
using Akka.Remote.TestKit;

namespace Akka.Remote.Tests.MultiNode
Expand All @@ -21,7 +22,10 @@ public RemoteDeliveryMultiNetSpec()
Second = Role("second");
Third = Role("third");

CommonConfig = DebugConfig(false);
CommonConfig = DebugConfig(true)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.remote.dot-netty.tcp.batching.enabled = false # disable batching
"));
}

public RoleName First { get; }
Expand Down
14 changes: 13 additions & 1 deletion src/core/Akka.Remote.Tests/RemoteConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,19 @@ public void Remoting_should_contain_correct_hostname_values_in_ReferenceConf()
//Non-specified hostnames should default to IPAddress.Any
Assert.Equal(IPAddress.Any.ToString(), s.Hostname);
Assert.Equal(IPAddress.Any.ToString(), s.PublicHostname);
}
}

[Fact]
public void Remoting_should_contain_correct_BatchWriter_settings_in_ReferenceConf()
{
var c = RARP.For(Sys).Provider.RemoteSettings.Config.GetConfig("akka.remote.dot-netty.tcp");
var s = DotNettyTransportSettings.Create(c);

s.BatchWriterSettings.EnableBatching.Should().BeTrue();
s.BatchWriterSettings.FlushInterval.Should().Be(BatchWriterSettings.DefaultFlushInterval);
s.BatchWriterSettings.MaxPendingBytes.Should().Be(BatchWriterSettings.DefaultMaxPendingBytes);
s.BatchWriterSettings.MaxPendingWrites.Should().Be(BatchWriterSettings.DefaultMaxPendingWrites);
}
}
}

140 changes: 140 additions & 0 deletions src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//-----------------------------------------------------------------------
// <copyright file="DotNettyBatchWriterSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2019 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2019 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Remote.Transport.DotNetty;
using Akka.TestKit;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Embedded;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Remote.Tests.Transport
{
public class DotNettyBatchWriterSpecs : AkkaSpec
{
public class FlushLogger : ChannelHandlerAdapter
{
public ITestOutputHelper Helper;

private TaskCompletionSource<Done> _tcs = new TaskCompletionSource<Done>();

public Task Activated => _tcs.Task;

public FlushLogger(ITestOutputHelper helper)
{
Helper = helper;
}

public override void ChannelActive(IChannelHandlerContext context)
{
_tcs.TrySetResult(Done.Instance);
base.ChannelActive(context);
}

public override void Flush(IChannelHandlerContext context)
{
Helper.WriteLine($"[{DateTime.UtcNow}] flushed");

base.Flush(context);
}
}

public FlushLogger Flush { get; }

public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(helper)
{
Flush = new FlushLogger(helper);
}



/// <summary>
/// Stay below the write / count and write / byte threshold. Rely on the timer.
/// </summary>
[Fact]
public async Task BatchWriter_should_succeed_with_timer()
{
var writer = new BatchWriter(new BatchWriterSettings());
var ch = new EmbeddedChannel(Flush, writer);

await Flush.Activated;

/*
* Run multiple iterations to ensure that the batching mechanism doesn't become stale
*/
foreach (var n in Enumerable.Repeat(0, 3))
{
var ints = Enumerable.Range(0, 4).ToArray();
foreach (var i in ints)
{
_ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i));
}

// force write tasks to run
ch.RunPendingTasks();

ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4);
ch.OutboundMessages.Count.Should().Be(0);

await AwaitAssertAsync(() =>
{
ch.RunPendingTasks(); // force scheduled task to run
ch.OutboundMessages.Count.Should().Be(ints.Length);
}, interval: 100.Milliseconds());

// reset the outbound queue
ch.OutboundMessages.Clear();
}
}

/// <summary>
/// Stay below the write / count and write / byte threshold. Rely on the timer.
/// </summary>
[Fact]
public async Task BatchWriter_should_flush_messages_during_shutdown()
{
var writer = new BatchWriter(new BatchWriterSettings());
var ch = new EmbeddedChannel(Flush, writer);

await Flush.Activated;

/*
* Run multiple iterations to ensure that the batching mechanism doesn't become stale
*/

var ints = Enumerable.Range(0, 10).ToArray();
foreach (var i in ints)
{
_ = ch.WriteAsync(Unpooled.Buffer(1).WriteInt(i));
}

// force write tasks to run
ch.RunPendingTasks();

ch.Unsafe.OutboundBuffer.TotalPendingWriteBytes().Should().Be(ints.Length * 4);
ch.OutboundMessages.Count.Should().Be(0);

// close channels
_ = ch.CloseAsync();

await AwaitAssertAsync(() =>
{
ch.RunPendingTasks(); // force scheduled task to run
ch.OutboundMessages.Count.Should().Be(ints.Length);
}, interval: 100.Milliseconds());


}
}
}
38 changes: 36 additions & 2 deletions src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,42 @@ akka {
# for the following bug: https://github.com/akkadotnet/akka.net/issues/3370
enable-pooling = true

# PERFORMANCE TUNING
#
# The batching feature of DotNetty is designed to help batch together logical writes into a smaller
# number of physical writes across the socket. This helps significantly reduce the number of system calls
# and can improve performance by as much as 150% on some systems when enabled.
batching{

# Enables the batching system. When disabled, every write will be flushed immediately.
# Disable this setting if you're working with a VERY low-traffic system that requires
# fast (< 40ms) acknowledgement for all periodic messages.
enabled = true

# The max write threshold based on the number of logical messages regardless of their size.
# This is a safe default value - decrease it if you have a small number of remote actors
# who engage in frequent request->response communication which requires low latency (< 40ms).
max-pending-writes = 30

# The max write threshold based on the byte size of all buffered messages. If there are 4 messages
# waiting to be written (with batching.max-pending-writes = 30) but their total size is greater than
# batching.max-pending-bytes, a flush will be triggered immediately.
#
# Increase this value is you have larger message sizes and watch to take advantage of batching, but
# otherwise leave it as-is.
#
# NOTE: this value should always be smaller than dot-netty.tcp.maximum-frame-size.
max-pending-bytes = 16k

# In the event that neither the batching.max-pending-writes or batching.max-pending-bytes
# is hit we guarantee that all pending writes will be flushed within this interval.
#
# This setting, realistically, can't be enforced any lower than the OS' clock resolution (~20ms).
# If you have a very low-traffic system, either disable pooling altogether or lower the batching.max-pending-writes
# threshold to maximize throughput. Otherwise, leave this setting as-is.
flush-interval = 40ms
}

# If set to "<id.of.dispatcher>" then the specified dispatcher
# will be used to accept inbound connections, and perform IO. If "" then
# dedicated threads will be used.
Expand Down Expand Up @@ -585,8 +621,6 @@ akka {
thread-count = 4
}
}


}

}
3 changes: 3 additions & 0 deletions src/core/Akka.Remote/Transport/DotNetty/AkkaLoggingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
using System;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using DotNetty.Buffers;
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;
using ILoggingAdapter = Akka.Event.ILoggingAdapter;

namespace Akka.Remote.Transport.DotNetty
{

/// <summary>
/// INTERNAL API
///
Expand Down
Loading

0 comments on commit cd11064

Please sign in to comment.