Skip to content

Commit

Permalink
Fix TcpOutgoingConnection memory leak (#3211) (#3212)
Browse files Browse the repository at this point in the history
* Fix TcpOutgoingConnection memory leak (#3211)
  • Loading branch information
yoPCix authored and Aaronontheweb committed Dec 15, 2017
1 parent 0b8fb51 commit 88d85c7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
4 changes: 3 additions & 1 deletion src/core/Akka/IO/SocketEventArgsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void Release(SocketAsyncEventArgs e)
}
catch (InvalidOperationException)
{
// it can be that for some reason socket is in use and haven't closed yet
// it can be that for some reason socket is in use and haven't closed yet. Dispose anyway to avoid leaks.
e.Dispose();
active--;
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,11 @@ protected override void PostStop()
if (IsWritePending)
{
pendingWrite.Release(); // we should release ConnectionInfo event args (if they're not released already)
ReleaseSocketAsyncEventArgs();
}

// always try to release SocketAsyncEventArgs to avoid memory leaks
ReleaseSocketAsyncEventArgs();

if (closedMessage != null)
{
var interestedInClose = IsWritePending
Expand Down
32 changes: 27 additions & 5 deletions src/core/Akka/IO/TcpOutgoingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection
{
private readonly IActorRef _commander;
private readonly Tcp.Connect _connect;


private SocketAsyncEventArgs _connectArgs;

public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
: base(tcp, new Socket(SocketType.Stream, ProtocolType.Tcp) { Blocking = false }, connect.PullMode)
{
Expand All @@ -44,8 +46,19 @@ public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connec
Context.SetReceiveTimeout(connect.Timeout.Value); //Initiate connection timeout if supplied
}

private void ReleaseConnectionSocketArgs()
{
if (_connectArgs != null)
{
Tcp.SocketEventArgsPool.Release(_connectArgs);
_connectArgs = null;
}
}

private void Stop()
{
ReleaseConnectionSocketArgs();

StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage));
}

Expand Down Expand Up @@ -87,6 +100,14 @@ protected override void PreStart()
});
}

protected override void PostStop()
{
// always try to release SocketAsyncEventArgs to avoid memory leaks
ReleaseConnectionSocketArgs();

base.PostStop();
}

protected override bool Receive(object message)
{
throw new NotSupportedException();
Expand Down Expand Up @@ -124,13 +145,13 @@ private void Register(IPEndPoint address, IPEndPoint fallbackAddress)
{
Log.Debug("Attempting connection to [{0}]", address);

var connectArgs = Tcp.SocketEventArgsPool.Acquire(Self);
connectArgs.RemoteEndPoint = address;
_connectArgs = Tcp.SocketEventArgsPool.Acquire(Self);
_connectArgs.RemoteEndPoint = address;
// we don't setup buffer here, it shouldn't be necessary just for connection
if (!Socket.ConnectAsync(connectArgs))
if (!Socket.ConnectAsync(_connectArgs))
Self.Tell(IO.Tcp.SocketConnected.Instance);

Become(Connecting(Tcp.Settings.FinishConnectRetries, connectArgs, fallbackAddress));
Become(Connecting(Tcp.Settings.FinishConnectRetries, _connectArgs, fallbackAddress));
});
}

Expand All @@ -145,6 +166,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null);
Log.Debug("Connection established to [{0}]", _connect.RemoteAddress);

ReleaseConnectionSocketArgs();
AcquireSocketAsyncEventArgs();

CompleteConnect(_commander, _connect.Options);
Expand Down

0 comments on commit 88d85c7

Please sign in to comment.