Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Remote memory leak fix: use singleton allocators for DotNetty channels. #3436

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ internal abstract class DotNettyTransport : Transport
protected volatile Address LocalAddress;
protected internal volatile IChannel ServerChannel;

private readonly IEventLoopGroup serverEventLoopGroup;
private readonly IEventLoopGroup clientEventLoopGroup;
private readonly IEventLoopGroup _serverEventLoopGroup;
private readonly IEventLoopGroup _clientEventLoopGroup;

protected DotNettyTransport(ActorSystem system, Config config)
{
Expand All @@ -141,8 +141,8 @@ protected DotNettyTransport(ActorSystem system, Config config)

Settings = DotNettyTransportSettings.Create(config);
Log = Logging.GetLogger(System, GetType());
serverEventLoopGroup = new MultithreadEventLoopGroup(Settings.ServerSocketWorkerPoolSize);
clientEventLoopGroup = new MultithreadEventLoopGroup(Settings.ClientSocketWorkerPoolSize);
_serverEventLoopGroup = new MultithreadEventLoopGroup(Settings.ServerSocketWorkerPoolSize);
_clientEventLoopGroup = new MultithreadEventLoopGroup(Settings.ClientSocketWorkerPoolSize);
ConnectionGroup = new ConcurrentSet<IChannel>();
AssociationListenerPromise = new TaskCompletionSource<IAssociationEventListener>();

Expand All @@ -161,8 +161,7 @@ protected async Task<IChannel> NewServer(EndPoint listenAddress)
if (InternalTransport != TransportMode.Tcp)
throw new NotImplementedException("Haven't implemented UDP transport at this time");

var dns = listenAddress as DnsEndPoint;
if (dns != null)
if (listenAddress is DnsEndPoint dns)
{
listenAddress = await DnsToIPEndpoint(dns).ConfigureAwait(false);
}
Expand Down Expand Up @@ -256,8 +255,8 @@ public override async Task<bool> Shutdown()
// free all of the connection objects we were holding onto
ConnectionGroup.Clear();
#pragma warning disable 4014 // shutting down the worker groups can take up to 10 seconds each. Let that happen asnychronously.
clientEventLoopGroup.ShutdownGracefullyAsync();
serverEventLoopGroup.ShutdownGracefullyAsync();
_clientEventLoopGroup.ShutdownGracefullyAsync();
_serverEventLoopGroup.ShutdownGracefullyAsync();
#pragma warning restore 4014
}
}
Expand All @@ -270,13 +269,13 @@ protected Bootstrap ClientFactory(Address remoteAddress)
var addressFamily = Settings.DnsUseIpv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork;

var client = new Bootstrap()
.Group(clientEventLoopGroup)
.Group(_clientEventLoopGroup)
.Option(ChannelOption.SoReuseaddr, Settings.TcpReuseAddr)
.Option(ChannelOption.SoKeepalive, Settings.TcpKeepAlive)
.Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay)
.Option(ChannelOption.ConnectTimeout, Settings.ConnectTimeout)
.Option(ChannelOption.AutoRead, false)
.Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)new PooledByteBufferAllocator() : new UnpooledByteBufferAllocator())
.Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default)
.ChannelFactory(() => Settings.EnforceIpFamily
? new TcpSocketChannel(addressFamily)
: new TcpSocketChannel())
Expand Down Expand Up @@ -379,13 +378,13 @@ private ServerBootstrap ServerFactory()
var addressFamily = Settings.DnsUseIpv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork;

var server = new ServerBootstrap()
.Group(serverEventLoopGroup)
.Group(_serverEventLoopGroup)
.Option(ChannelOption.SoReuseaddr, Settings.TcpReuseAddr)
.Option(ChannelOption.SoKeepalive, Settings.TcpKeepAlive)
.Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay)
.Option(ChannelOption.AutoRead, false)
.Option(ChannelOption.SoBacklog, Settings.Backlog)
.Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)new PooledByteBufferAllocator() : new UnpooledByteBufferAllocator())
.Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would definitely solve the allocator issue for clients... but the server is where we detected this memory leak and IIRC, this ServerBootstrap gets re-used to recreate client connections over and over again. Thus, I'm not sure this would actually impact the server. Bleh.

.ChannelFactory(() => Settings.EnforceIpFamily
? new TcpServerSocketChannel(addressFamily)
: new TcpServerSocketChannel())
Expand Down