Skip to content

Commit

Permalink
Fix UDP memory leak (#5404)
Browse files Browse the repository at this point in the history
* Fix UDP memory leak

* Revert changes that are not connected to UDP

* Code comment cleanup

* Update API Approval list

* Remove SocketEventArgsPool functionality

* Fix specs

* Add documentation in comments to describe the architectural changes, use Assert.Debug instead of throwing.

* Remove the logger, we're using Debug.Assert instead

* Clean up assert code

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored Dec 2, 2021
1 parent fe0f10a commit a920b07
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 191 deletions.
12 changes: 10 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3232,8 +3232,17 @@ namespace Akka.IO.Buffers
public BufferPoolAllocationException(string message) { }
protected BufferPoolAllocationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class BufferPoolInfo
{
public BufferPoolInfo(System.Type type, long totalSize, long free, long used) { }
public long Free { get; }
public long TotalSize { get; }
public System.Type Type { get; }
public long Used { get; }
}
public interface IBufferPool
{
Akka.IO.Buffers.BufferPoolInfo Diagnostics();
void Release(System.ArraySegment<byte> buf);
void Release(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buf);
System.ArraySegment<byte> Rent();
Expand Down Expand Up @@ -3354,6 +3363,7 @@ namespace Akka.IO
}
public interface ISocketEventArgsPool
{
Akka.IO.Buffers.BufferPoolInfo BufferPoolInfo { get; }
System.Net.Sockets.SocketAsyncEventArgs Acquire(Akka.Actor.IActorRef actor);
void Release(System.Net.Sockets.SocketAsyncEventArgs e);
}
Expand Down Expand Up @@ -3876,14 +3886,12 @@ namespace Akka.IO
{
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { }
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }
public Akka.IO.Buffers.IBufferPool BufferPool { get; }
public override Akka.Actor.IActorRef Manager { get; }
}
public class UdpExt : Akka.IO.IOExtension
{
public UdpExt(Akka.Actor.ExtendedActorSystem system) { }
public UdpExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }
public Akka.IO.Buffers.IBufferPool BufferPool { get; }
public override Akka.Actor.IActorRef Manager { get; }
}
public class static UdpExtensions
Expand Down
71 changes: 67 additions & 4 deletions src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
using System;
using System.Linq;
using System.Net;
using System.Threading;
using Akka.Actor;
using Akka.IO;
using Akka.IO.Buffers;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
Expand All @@ -26,12 +28,20 @@ public UdpConnectedIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
akka.io.udp-connected.buffer-pool = ""akka.io.udp-connected.direct-buffer-pool""
akka.io.udp-connected.nr-of-selectors = 1
akka.io.udp-connected.direct-buffer-pool-limit = 100
akka.io.udp-connected.direct-buffer-size = 1024
# This comes out to be about 1.6 Mib maximum total buffer size
akka.io.udp-connected.direct-buffer-pool.buffer-size = 512
akka.io.udp-connected.direct-buffer-pool.buffers-per-segment = 32
akka.io.udp-connected.direct-buffer-pool.buffer-pool-limit = 100
akka.io.udp.buffer-pool = ""akka.io.udp.direct-buffer-pool""
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024
# This comes out to be about 1.6 Mib maximum total buffer size
akka.io.udp.direct-buffer-pool.buffer-size = 512
akka.io.udp.direct-buffer-pool.buffers-per-segment = 32
akka.io.udp.direct-buffer-pool.buffer-pool-limit = 100
akka.io.udp.trace-logging = true
akka.loglevel = DEBUG", output)
{
Expand Down Expand Up @@ -141,5 +151,58 @@ public void The_UDP_connection_oriented_implementation_must_to_send_batch_writes
msgs = raw.Cast<UdpConnected.Received>();
msgs.Sum(x => x.Data.Count).Should().Be(data.Count * 3);
}

[Fact]
public void The_UDP_connection_oriented_implementation_must_not_leak_memory()
{
const int batchCount = 2000;
const int batchSize = 100;

var serverAddress = _addresses[0];
var clientAddress = _addresses[1];
var udpConnection = UdpConnected.Instance.Apply(Sys);

var poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);

var server = CreateTestProbe();
udpConnection.Manager.Tell(new UdpConnected.Connect(server, clientAddress, serverAddress), server);
server.ExpectMsg<UdpConnected.Connected>();
var serverEp = server.LastSender;

var client = CreateTestProbe();
udpConnection.Manager.Tell(new UdpConnected.Connect(client, serverAddress, clientAddress), client);
client.ExpectMsg<UdpConnected.Connected>();
var clientEp = client.LastSender;

var data = ByteString.FromString("Fly little packet!");

// send a lot of packets through, the byte buffer pool should not leak anything
for (var n = 0; n < batchCount; ++n)
{
for (var j = 0; j < batchSize; ++j)
serverEp.Tell(UdpConnected.Send.Create(data));

var msgs = client.ReceiveN(batchSize, TimeSpan.FromSeconds(10));
var cast = msgs.Cast<UdpConnected.Received>();
cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
serverEp.Tell(UdpConnected.Disconnect.Instance, server);
server.ExpectMsg<UdpConnected.Disconnected>();
clientEp.Tell(UdpConnected.Disconnect.Instance, client);
client.ExpectMsg<UdpConnected.Disconnected>();

// wait for all SocketAsyncEventArgs to be released
Thread.Sleep(1000);

poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);
}
}
}
127 changes: 114 additions & 13 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Akka.Actor;
using Akka.IO;
using Akka.IO.Buffers;
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
Expand All @@ -31,8 +33,15 @@ public UdpIntegrationSpec(ITestOutputHelper output)
akka.actor.serialize-messages = on
akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024", output)
akka.io.udp.buffer-pool = ""akka.io.udp.direct-buffer-pool""
akka.io.udp.nr-of-selectors = 1
# This comes out to be about 1.6 Mib maximum total buffer size
akka.io.udp.direct-buffer-pool.buffer-size = 512
akka.io.udp.direct-buffer-pool.buffers-per-segment = 32
akka.io.udp.direct-buffer-pool.buffer-pool-limit = 100
# akka.io.udp.trace-logging = true
akka.loglevel = DEBUG", output)
{
_addresses = TestUtils.TemporaryServerAddresses(6, udp: true).ToArray();
}
Expand Down Expand Up @@ -112,27 +121,28 @@ public void The_UDP_Fire_and_Forget_implementation_must_be_able_to_send_several_
{
var serverAddress = _addresses[0];
var clientAddress = _addresses[1];
var server = BindUdp(serverAddress, TestActor);
var client = BindUdp(clientAddress, TestActor);
var data = ByteString.FromString("Fly little packet!");
var serverProbe = CreateTestProbe();
var clientProbe = CreateTestProbe();
var server = BindUdp(serverAddress, serverProbe);
var client = BindUdp(clientAddress, clientProbe);

void CheckSendingToClient(int iteration)
{
server.Tell(Udp.Send.Create(data, clientAddress));
ExpectMsg<Udp.Received>(x =>
server.Tell(Udp.Send.Create(ByteString.FromString(iteration.ToString()), clientAddress));
clientProbe.ExpectMsg<Udp.Received>(x =>
{
x.Data.ShouldBe(data);
x.Sender.Is(serverAddress).ShouldBeTrue($"{x.Sender} was expected to be {serverAddress}");
x.Data.ToString().ShouldBe(iteration.ToString());
x.Sender.Is(serverAddress).ShouldBeTrue($"Client sender {x.Sender} was expected to be {serverAddress}");
}, hint: $"sending to client failed in {iteration} iteration");
}

void CheckSendingToServer(int iteration)
{
client.Tell(Udp.Send.Create(data, serverAddress));
ExpectMsg<Udp.Received>(x =>
client.Tell(Udp.Send.Create(ByteString.FromString(iteration.ToString()), serverAddress));
serverProbe.ExpectMsg<Udp.Received>(x =>
{
x.Data.ShouldBe(data);
Assert.True(x.Sender.Is(clientAddress));
x.Data.ToString().ShouldBe(iteration.ToString());
x.Sender.Is(clientAddress).ShouldBeTrue($"Server sender {x.Sender} was expected to be {clientAddress}");
}, hint: $"sending to client failed in {iteration} iteration");
}

Expand Down Expand Up @@ -194,6 +204,97 @@ void CheckSendingToServer(ByteString expected)
for (int i = 0; i < iterations; i++) CheckSendingToClient(data[i]);
}

[Fact]
public void The_UDP_Fire_and_Forget_implementation_must_not_leak_memory()
{
const int batchCount = 2000;
const int batchSize = 100;

var serverAddress = _addresses[0];
var clientAddress = _addresses[1];

var udp = Udp.Instance.Apply(Sys);
var poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);

var serverProbe = CreateTestProbe();
var server = BindUdp(serverAddress, serverProbe);
var clientProbe = CreateTestProbe();
var client = BindUdp(clientAddress, clientProbe);

var data = ByteString.FromString("Fly little packet!");

// send a lot of packets through, the byte buffer pool should not leak anything
for (var n = 0; n < batchCount; ++n)
{
for (var i = 0; i < batchSize; i++)
server.Tell(Udp.Send.Create(data, clientAddress));

var msgs = clientProbe.ReceiveN(batchSize);
var receives = msgs.Cast<Udp.Received>();
receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
server.Tell(Udp.Unbind.Instance, serverProbe);
serverProbe.ExpectMsg<Udp.Unbound>();
client.Tell(Udp.Unbind.Instance, clientProbe);
clientProbe.ExpectMsg<Udp.Unbound>();

// wait for all SocketAsyncEventArgs to be released
Thread.Sleep(1000);

poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);
}

[Fact]
public void The_UDP_Fire_and_Forget_SimpleSender_implementation_must_not_leak_memory()
{
const int batchCount = 2000;
const int batchSize = 100;

var udp = Udp.Instance.Apply(Sys);
var poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);

var serverAddress = _addresses[0];
var serverProbe = CreateTestProbe();
var server = BindUdp(serverAddress, serverProbe);
var sender = SimpleSender();

var data = ByteString.FromString("Fly little packet!");

// send a lot of packets through, the byte buffer pool should not leak anything
for (var n = 0; n < batchCount; ++n)
{
for (int i = 0; i < batchSize; i++)
sender.Tell(Udp.Send.Create(data, serverAddress));

var msgs = serverProbe.ReceiveN(batchSize);
var receives = msgs.Cast<Udp.Received>();
receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
server.Tell(Udp.Unbind.Instance, serverProbe);
serverProbe.ExpectMsg<Udp.Unbound>();

// wait for all SocketAsyncEventArgs to be released
Thread.Sleep(1000);

poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);
}

[Fact]
public void The_UDP_Fire_and_Forget_implementation_must_call_SocketOption_beforeBind_method_before_bind()
{
Expand Down
43 changes: 40 additions & 3 deletions src/core/Akka/IO/Buffers/DirectBufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.Serialization;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Debug = System.Diagnostics.Debug;

namespace Akka.IO.Buffers
{
Expand All @@ -33,6 +36,22 @@ protected BufferPoolAllocationException(SerializationInfo info, StreamingContext
}
}

public class BufferPoolInfo
{
public BufferPoolInfo(Type type, long totalSize, long free, long used)
{
Type = type;
TotalSize = totalSize;
Free = free;
Used = used;
}

public Type Type { get; }
public long TotalSize { get; }
public long Free { get; }
public long Used { get; }
}

/// <summary>
/// An interface used to acquire/release recyclable chunks of
/// bytes to be reused without need to triggering GC.
Expand Down Expand Up @@ -70,6 +89,8 @@ public interface IBufferPool
/// </summary>
/// <param name="buf"></param>
void Release(IEnumerable<ByteBuffer> buf);

BufferPoolInfo Diagnostics();
}

/// <summary>
Expand Down Expand Up @@ -131,6 +152,17 @@ public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegmen
}
}

public BufferPoolInfo Diagnostics()
=> new BufferPoolInfo(
type: typeof(DirectBufferPool),
totalSize: _segments.Count * _segmentSize,
free: _buffers.Count * _bufferSize,
used: (_segments.Count * _segmentSize) - (_buffers.Count * _bufferSize));

public override string ToString()
=> $"_bufferSize: [{_buffers}], _bufferPerSegment: [{_buffersPerSegment}], _segmentSize: [{_segmentSize}], " +
$"_segments.Count: [{_segments.Count}], _buffers.Count: [{_buffers.Count}]";

private void AllocateSegment()
{
lock (_syncRoot)
Expand Down Expand Up @@ -183,7 +215,7 @@ public IEnumerable<ByteBuffer> Rent(int minimumSize)
AllocateSegment();
}

throw new BufferPoolAllocationException($"Couldn't allocate enough byte buffer to fill the tolal requested size of {minimumSize} bytes");
throw new BufferPoolAllocationException($"Couldn't allocate enough byte buffer to fill the total requested size of {minimumSize} bytes");
}
catch
{
Expand All @@ -195,8 +227,13 @@ public IEnumerable<ByteBuffer> Rent(int minimumSize)
public void Release(ByteBuffer buf)
{
// only release buffers that have actually been taken from one of the segments
if (buf.Count == _bufferSize && _segments.Contains(buf.Array))
_buffers.Push(buf);
if (buf.Count != _bufferSize || !_segments.Contains(buf.Array))
{
Debug.Assert(false, "Wrong ArraySegment<byte> was returned to the pool. WARNING: This can lead to memory leak.");
return;
}

_buffers.Push(buf);
}

public void Release(IEnumerable<ByteBuffer> buffers)
Expand Down
Loading

0 comments on commit a920b07

Please sign in to comment.