Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix UDP memory leak #5404

Merged
merged 14 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3232,8 +3232,17 @@ namespace Akka.IO.Buffers
public BufferPoolAllocationException(string message) { }
protected BufferPoolAllocationException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class BufferPoolInfo
{
public BufferPoolInfo(System.Type type, long totalSize, long free, long used) { }
public long Free { get; }
public long TotalSize { get; }
public System.Type Type { get; }
public long Used { get; }
}
public interface IBufferPool
{
Akka.IO.Buffers.BufferPoolInfo Diagnostics();
void Release(System.ArraySegment<byte> buf);
void Release(System.Collections.Generic.IEnumerable<System.ArraySegment<byte>> buf);
System.ArraySegment<byte> Rent();
Expand Down Expand Up @@ -3354,6 +3363,7 @@ namespace Akka.IO
}
public interface ISocketEventArgsPool
{
Akka.IO.Buffers.BufferPoolInfo BufferPoolInfo { get; }
System.Net.Sockets.SocketAsyncEventArgs Acquire(Akka.Actor.IActorRef actor);
void Release(System.Net.Sockets.SocketAsyncEventArgs e);
}
Expand Down Expand Up @@ -3876,14 +3886,12 @@ namespace Akka.IO
{
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { }
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }
public Akka.IO.Buffers.IBufferPool BufferPool { get; }
public override Akka.Actor.IActorRef Manager { get; }
}
public class UdpExt : Akka.IO.IOExtension
{
public UdpExt(Akka.Actor.ExtendedActorSystem system) { }
public UdpExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }
public Akka.IO.Buffers.IBufferPool BufferPool { get; }
public override Akka.Actor.IActorRef Manager { get; }
}
public class static UdpExtensions
Expand Down
71 changes: 67 additions & 4 deletions src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
using System;
using System.Linq;
using System.Net;
using System.Threading;
using Akka.Actor;
using Akka.IO;
using Akka.IO.Buffers;
using Akka.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
Expand All @@ -26,12 +28,20 @@ public UdpConnectedIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on

akka.io.udp-connected.buffer-pool = ""akka.io.udp-connected.direct-buffer-pool""
akka.io.udp-connected.nr-of-selectors = 1
akka.io.udp-connected.direct-buffer-pool-limit = 100
akka.io.udp-connected.direct-buffer-size = 1024
# This comes out to be about 1.6 Mib maximum total buffer size
akka.io.udp-connected.direct-buffer-pool.buffer-size = 512
akka.io.udp-connected.direct-buffer-pool.buffers-per-segment = 32
akka.io.udp-connected.direct-buffer-pool.buffer-pool-limit = 100

akka.io.udp.buffer-pool = ""akka.io.udp.direct-buffer-pool""
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024
# This comes out to be about 1.6 Mib maximum total buffer size
akka.io.udp.direct-buffer-pool.buffer-size = 512
akka.io.udp.direct-buffer-pool.buffers-per-segment = 32
akka.io.udp.direct-buffer-pool.buffer-pool-limit = 100
akka.io.udp.trace-logging = true
akka.loglevel = DEBUG", output)
{
Expand Down Expand Up @@ -141,5 +151,58 @@ public void The_UDP_connection_oriented_implementation_must_to_send_batch_writes
msgs = raw.Cast<UdpConnected.Received>();
msgs.Sum(x => x.Data.Count).Should().Be(data.Count * 3);
}

[Fact]
public void The_UDP_connection_oriented_implementation_must_not_leak_memory()
{
const int batchCount = 2000;
const int batchSize = 100;

var serverAddress = _addresses[0];
var clientAddress = _addresses[1];
var udpConnection = UdpConnected.Instance.Apply(Sys);

var poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);

var server = CreateTestProbe();
udpConnection.Manager.Tell(new UdpConnected.Connect(server, clientAddress, serverAddress), server);
server.ExpectMsg<UdpConnected.Connected>();
var serverEp = server.LastSender;

var client = CreateTestProbe();
udpConnection.Manager.Tell(new UdpConnected.Connect(client, serverAddress, clientAddress), client);
client.ExpectMsg<UdpConnected.Connected>();
var clientEp = client.LastSender;

var data = ByteString.FromString("Fly little packet!");

// send a lot of packets through, the byte buffer pool should not leak anything
for (var n = 0; n < batchCount; ++n)
{
for (var j = 0; j < batchSize; ++j)
serverEp.Tell(UdpConnected.Send.Create(data));

var msgs = client.ReceiveN(batchSize, TimeSpan.FromSeconds(10));
var cast = msgs.Cast<UdpConnected.Received>();
cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
serverEp.Tell(UdpConnected.Disconnect.Instance, server);
server.ExpectMsg<UdpConnected.Disconnected>();
clientEp.Tell(UdpConnected.Disconnect.Instance, client);
client.ExpectMsg<UdpConnected.Disconnected>();

// wait for all SocketAsyncEventArgs to be released
Thread.Sleep(1000);

poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
}
}
127 changes: 114 additions & 13 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Akka.Actor;
using Akka.IO;
using Akka.IO.Buffers;
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
Expand All @@ -31,8 +33,15 @@ public UdpIntegrationSpec(ITestOutputHelper output)
akka.actor.serialize-messages = on
akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024", output)

akka.io.udp.buffer-pool = ""akka.io.udp.direct-buffer-pool""
akka.io.udp.nr-of-selectors = 1
# This comes out to be about 1.6 Mib maximum total buffer size
akka.io.udp.direct-buffer-pool.buffer-size = 512
akka.io.udp.direct-buffer-pool.buffers-per-segment = 32
akka.io.udp.direct-buffer-pool.buffer-pool-limit = 100
# akka.io.udp.trace-logging = true
akka.loglevel = DEBUG", output)
{
_addresses = TestUtils.TemporaryServerAddresses(6, udp: true).ToArray();
}
Expand Down Expand Up @@ -112,27 +121,28 @@ public void The_UDP_Fire_and_Forget_implementation_must_be_able_to_send_several_
{
var serverAddress = _addresses[0];
var clientAddress = _addresses[1];
var server = BindUdp(serverAddress, TestActor);
var client = BindUdp(clientAddress, TestActor);
var data = ByteString.FromString("Fly little packet!");
var serverProbe = CreateTestProbe();
var clientProbe = CreateTestProbe();
var server = BindUdp(serverAddress, serverProbe);
var client = BindUdp(clientAddress, clientProbe);

void CheckSendingToClient(int iteration)
{
server.Tell(Udp.Send.Create(data, clientAddress));
ExpectMsg<Udp.Received>(x =>
server.Tell(Udp.Send.Create(ByteString.FromString(iteration.ToString()), clientAddress));
clientProbe.ExpectMsg<Udp.Received>(x =>
{
x.Data.ShouldBe(data);
x.Sender.Is(serverAddress).ShouldBeTrue($"{x.Sender} was expected to be {serverAddress}");
x.Data.ToString().ShouldBe(iteration.ToString());
x.Sender.Is(serverAddress).ShouldBeTrue($"Client sender {x.Sender} was expected to be {serverAddress}");
}, hint: $"sending to client failed in {iteration} iteration");
}

void CheckSendingToServer(int iteration)
{
client.Tell(Udp.Send.Create(data, serverAddress));
ExpectMsg<Udp.Received>(x =>
client.Tell(Udp.Send.Create(ByteString.FromString(iteration.ToString()), serverAddress));
serverProbe.ExpectMsg<Udp.Received>(x =>
{
x.Data.ShouldBe(data);
Assert.True(x.Sender.Is(clientAddress));
x.Data.ToString().ShouldBe(iteration.ToString());
x.Sender.Is(clientAddress).ShouldBeTrue($"Server sender {x.Sender} was expected to be {clientAddress}");
}, hint: $"sending to client failed in {iteration} iteration");
}

Expand Down Expand Up @@ -194,6 +204,97 @@ void CheckSendingToServer(ByteString expected)
for (int i = 0; i < iterations; i++) CheckSendingToClient(data[i]);
}

[Fact]
public void The_UDP_Fire_and_Forget_implementation_must_not_leak_memory()
{
const int batchCount = 2000;
const int batchSize = 100;

var serverAddress = _addresses[0];
var clientAddress = _addresses[1];

var udp = Udp.Instance.Apply(Sys);
var poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);

var serverProbe = CreateTestProbe();
var server = BindUdp(serverAddress, serverProbe);
var clientProbe = CreateTestProbe();
var client = BindUdp(clientAddress, clientProbe);

var data = ByteString.FromString("Fly little packet!");

// send a lot of packets through, the byte buffer pool should not leak anything
for (var n = 0; n < batchCount; ++n)
{
for (var i = 0; i < batchSize; i++)
server.Tell(Udp.Send.Create(data, clientAddress));

var msgs = clientProbe.ReceiveN(batchSize);
var receives = msgs.Cast<Udp.Received>();
receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
server.Tell(Udp.Unbind.Instance, serverProbe);
serverProbe.ExpectMsg<Udp.Unbound>();
client.Tell(Udp.Unbind.Instance, clientProbe);
clientProbe.ExpectMsg<Udp.Unbound>();

// wait for all SocketAsyncEventArgs to be released
Thread.Sleep(1000);

poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

[Fact]
public void The_UDP_Fire_and_Forget_SimpleSender_implementation_must_not_leak_memory()
{
const int batchCount = 2000;
const int batchSize = 100;

var udp = Udp.Instance.Apply(Sys);
var poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);

var serverAddress = _addresses[0];
var serverProbe = CreateTestProbe();
var server = BindUdp(serverAddress, serverProbe);
var sender = SimpleSender();

var data = ByteString.FromString("Fly little packet!");

// send a lot of packets through, the byte buffer pool should not leak anything
for (var n = 0; n < batchCount; ++n)
{
for (int i = 0; i < batchSize; i++)
sender.Tell(Udp.Send.Create(data, serverAddress));

var msgs = serverProbe.ReceiveN(batchSize);
var receives = msgs.Cast<Udp.Received>();
receives.Sum(r => r.Data.Count).Should().Be(data.Count * batchSize);
}

// stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected
server.Tell(Udp.Unbind.Instance, serverProbe);
serverProbe.ExpectMsg<Udp.Unbound>();

// wait for all SocketAsyncEventArgs to be released
Thread.Sleep(1000);

poolInfo = udp.SocketEventArgsPool.BufferPoolInfo;
poolInfo.Type.Should().Be(typeof(DirectBufferPool));
poolInfo.Free.Should().Be(poolInfo.TotalSize);
poolInfo.Used.Should().Be(0);
}

[Fact]
public void The_UDP_Fire_and_Forget_implementation_must_call_SocketOption_beforeBind_method_before_bind()
{
Expand Down
37 changes: 34 additions & 3 deletions src/core/Akka/IO/Buffers/DirectBufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ protected BufferPoolAllocationException(SerializationInfo info, StreamingContext
}
}

public class BufferPoolInfo
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
public BufferPoolInfo(Type type, long totalSize, long free, long used)
{
Type = type;
TotalSize = totalSize;
Free = free;
Used = used;
}

public Type Type { get; }
public long TotalSize { get; }
public long Free { get; }
public long Used { get; }
}

/// <summary>
/// An interface used to acquire/release recyclable chunks of
/// bytes to be reused without need to triggering GC.
Expand Down Expand Up @@ -70,6 +86,8 @@ public interface IBufferPool
/// </summary>
/// <param name="buf"></param>
void Release(IEnumerable<ByteBuffer> buf);

BufferPoolInfo Diagnostics();
}

/// <summary>
Expand Down Expand Up @@ -131,6 +149,17 @@ public DirectBufferPool(int bufferSize, int buffersPerSegment, int initialSegmen
}
}

public BufferPoolInfo Diagnostics()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

=> new BufferPoolInfo(
type: typeof(DirectBufferPool),
totalSize: _segments.Count * _segmentSize,
free: _buffers.Count * _bufferSize,
used: (_segments.Count * _segmentSize) - (_buffers.Count * _bufferSize));

public override string ToString()
=> $"_bufferSize: [{_buffers}], _bufferPerSegment: [{_buffersPerSegment}], _segmentSize: [{_segmentSize}], " +
$"_segments.Count: [{_segments.Count}], _buffers.Count: [{_buffers.Count}]";

private void AllocateSegment()
{
lock (_syncRoot)
Expand Down Expand Up @@ -183,7 +212,7 @@ public IEnumerable<ByteBuffer> Rent(int minimumSize)
AllocateSegment();
}

throw new BufferPoolAllocationException($"Couldn't allocate enough byte buffer to fill the tolal requested size of {minimumSize} bytes");
throw new BufferPoolAllocationException($"Couldn't allocate enough byte buffer to fill the total requested size of {minimumSize} bytes");
}
catch
{
Expand All @@ -195,8 +224,10 @@ public IEnumerable<ByteBuffer> Rent(int minimumSize)
public void Release(ByteBuffer buf)
{
// only release buffers that have actually been taken from one of the segments
if (buf.Count == _bufferSize && _segments.Contains(buf.Array))
_buffers.Push(buf);
if (buf.Count != _bufferSize || !_segments.Contains(buf.Array))
throw new Exception("Wrong ArraySegment<byte> was released to DirectBufferPool");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity check to make sure that any ArraySegment<byte> returned to the buffer actually belongs to the buffer, should only happen during development.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it only happen during development?


_buffers.Push(buf);
}

public void Release(IEnumerable<ByteBuffer> buffers)
Expand Down
5 changes: 4 additions & 1 deletion src/core/Akka/IO/Buffers/DisabledBufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public void Release(IEnumerable<ByteBuffer> buffers)
Release(buf);
}
}


public BufferPoolInfo Diagnostics()
=> new BufferPoolInfo(typeof(DisabledBufferPool), 0, 0, 0);

private ByteBuffer RentOfSize(int size)
{
var bytes = new byte[size];
Expand Down
Loading